ahmedabu98 commented on code in PR #34140:
URL: https://github.com/apache/beam/pull/34140#discussion_r1991384776


##########
sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java:
##########
@@ -739,39 +740,45 @@ public void testColumnSpecificMetricsCollection() throws 
IOException {
 
     RecordWriterManager writerManager = new RecordWriterManager(catalog, 
"test_file_name", 1000, 3);
 
-    Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build();
-    boolean writeSuccess = writerManager.write(windowedDestination, row);
-    assertTrue("Write operation should succeed", writeSuccess);
+    // Write multiple rows
+    Row row1 = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build();
+    Row row2 = Row.withSchema(BEAM_SCHEMA).addValues(2, "bbb", false).build();
+    Row row3 = Row.withSchema(BEAM_SCHEMA).addValues(3, "ccc", true).build();
+    assertTrue("Write operation should succeed", 
writerManager.write(windowedDestination, row1));
+    assertTrue("Write operation should succeed", 
writerManager.write(windowedDestination, row2));
+    assertTrue("Write operation should succeed", 
writerManager.write(windowedDestination, row3));
     writerManager.close();
 
     Map<WindowedValue<IcebergDestination>, List<SerializableDataFile>> 
dataFiles =
         writerManager.getSerializableDataFiles();
     assertFalse("Data files should not be empty", dataFiles.isEmpty());
 
+    // Commit using the same table instance
     for (Map.Entry<WindowedValue<IcebergDestination>, 
List<SerializableDataFile>> entry :
         dataFiles.entrySet()) {
-      Table tableToCommit = 
catalog.loadTable(entry.getKey().getValue().getTableIdentifier());
-      AppendFiles appendFiles = tableToCommit.newAppend();
+      AppendFiles appendFiles = table.newAppend();
       for (SerializableDataFile dataFile : entry.getValue()) {
-        appendFiles.appendFile(dataFile.createDataFile(tableToCommit.specs()));
+        appendFiles.appendFile(dataFile.createDataFile(table.specs()));
       }
       appendFiles.commit();
-      tableToCommit.refresh();
     }
+    table.refresh(); // Single refresh after all commits
 
-    // Verify the correct table (where data was written)
-    Table dataTable =
-        catalog.loadTable(
-            TableIdentifier.of("default", 
"table_testColumnSpecificMetricsCollection"));
-    dataTable.refresh();
-
-    // Verify that a snapshot exists
-    Snapshot snapshot = dataTable.currentSnapshot();
+    // Verify using the same table
+    Snapshot snapshot = table.currentSnapshot();
     assertNotNull("Table should have a snapshot after writing data", snapshot);
 
-    // Verify metrics are collected only for specified columns
-    DataFile dataFile = 
snapshot.addedDataFiles(dataTable.io()).iterator().next();
-    assertNotNull(dataFile.valueCounts());
+    // Verify metrics are collected for specified columns in one file
+    DataFile dataFile = snapshot.addedDataFiles(table.io()).iterator().next();
+    Map<Integer, Long> valueCounts = dataFile.valueCounts();
+    assertNotNull("Value counts should not be null", valueCounts);
+    assertTrue("Value counts should exist for id (column 1)", 
valueCounts.containsKey(1));
+    assertEquals("Value count for id should be 1 in this file", 1L, 
valueCounts.get(1).longValue());
+    assertTrue("Value counts should exist for name (column 2)", 
valueCounts.containsKey(2));
+    assertEquals(
+        "Value count for name should be 1 in this file", 1L, 
valueCounts.get(2).longValue());

Review Comment:
   We're writing 3 records, so shouldn't these value counts equal 3? 



##########
sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java:
##########
@@ -786,52 +793,69 @@ public void testDefaultMetrics() throws IOException {
     // Create a RecordWriterManager
     RecordWriterManager writerManager = new RecordWriterManager(catalog, 
"test_file_name", 1000, 3);
 
-    // Write a row
-    Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build();
-    boolean writeSuccess = writerManager.write(windowedDestination, row);
-    assertTrue("Write operation should succeed", writeSuccess);
+    // Write multiple rows
+    Row row1 = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build();
+    Row row2 = Row.withSchema(BEAM_SCHEMA).addValues(2, "bbb", false).build();
+    Row row3 = Row.withSchema(BEAM_SCHEMA).addValues(3, "ccc", true).build();
+    assertTrue("Write operation should succeed", 
writerManager.write(windowedDestination, row1));
+    assertTrue("Write operation should succeed", 
writerManager.write(windowedDestination, row2));
+    assertTrue("Write operation should succeed", 
writerManager.write(windowedDestination, row3));
     writerManager.close();
 
     // Manually commit the data files to the Iceberg table
     Map<WindowedValue<IcebergDestination>, List<SerializableDataFile>> 
dataFiles =
         writerManager.getSerializableDataFiles();
     assertFalse("Data files should not be empty", dataFiles.isEmpty());
 
+    // Commit using the same table instance
     for (Map.Entry<WindowedValue<IcebergDestination>, 
List<SerializableDataFile>> entry :
         dataFiles.entrySet()) {
-      Table tableToCommit = 
catalog.loadTable(entry.getKey().getValue().getTableIdentifier());
-      AppendFiles appendFiles = tableToCommit.newAppend();
+      AppendFiles appendFiles = table.newAppend();
       for (SerializableDataFile dataFile : entry.getValue()) {
-        appendFiles.appendFile(dataFile.createDataFile(tableToCommit.specs()));
+        appendFiles.appendFile(dataFile.createDataFile(table.specs()));
       }
       appendFiles.commit();
-      tableToCommit.refresh();
     }
+    table.refresh(); // Single refresh after all commits
 
-    // Verify the correct table (where data was written)
-    Table dataTable = catalog.loadTable(TableIdentifier.of("default", 
"test_default_metrics"));
-    dataTable.refresh();
-
-    // Verify that a snapshot exists
-    Snapshot snapshot = dataTable.currentSnapshot();
+    // Verify using the same table
+    Snapshot snapshot = table.currentSnapshot();
     assertNotNull("Table should have a snapshot after writing data", snapshot);
 
-    // Verify metrics are collected for all columns
-    DataFile dataFile = 
snapshot.addedDataFiles(dataTable.io()).iterator().next();
+    // Verify metrics are collected for all columns in one file
+    DataFile dataFile = snapshot.addedDataFiles(table.io()).iterator().next();
     assertNotNull(dataFile.valueCounts());
     assertNotNull(dataFile.nullValueCounts());
     assertNotNull(dataFile.columnSizes());
+    assertNotNull(dataFile.lowerBounds());
+    assertNotNull(dataFile.upperBounds());
 
-    // Verify metrics are collected for all columns
     Map<Integer, Long> valueCounts = dataFile.valueCounts();
     Map<Integer, Long> nullValueCounts = dataFile.nullValueCounts();
     Map<Integer, Long> columnSizes = dataFile.columnSizes();
+    Map<Integer, ByteBuffer> lowerBounds = dataFile.lowerBounds();
+    Map<Integer, ByteBuffer> upperBounds = dataFile.upperBounds();
 
     for (int i = 1; i <= ICEBERG_SCHEMA.columns().size(); i++) {
       assertTrue("Value counts should be collected for column " + i, 
valueCounts.containsKey(i));
+      assertEquals(
+          "Value count for column " + i + " should be 1 in this file",
+          1L,
+          valueCounts.get(i).longValue());
       assertTrue(
           "Null value counts should be collected for column " + i, 
nullValueCounts.containsKey(i));
       assertTrue("Column sizes should be collected for column " + i, 
columnSizes.containsKey(i));
+      assertTrue("Lower bounds should be collected for column " + i, 
lowerBounds.containsKey(i));
+      assertTrue("Upper bounds should be collected for column " + i, 
upperBounds.containsKey(i));
+
+      // Verify bounds are non-null and equal (single-row file)
+      ByteBuffer lower = lowerBounds.get(i);
+      ByteBuffer upper = upperBounds.get(i);
+      assertNotNull("Lower bound for column " + i + " should not be null", 
lower);
+      assertNotNull("Upper bound for column " + i + " should not be null", 
upper);
+      assertTrue(
+          "Lower and upper bounds for column " + i + " should be equal in 
single-row file",
+          lower.equals(upper));

Review Comment:
   We can use Iceberg utils to convert ByteBuffer to a value and verify:
   ```java
   Conversions.fromByteBuffer(
                 Types.IntegerType.get(), lowerBounds.get(<columnId>))
   ```



##########
sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java:
##########
@@ -786,52 +793,69 @@ public void testDefaultMetrics() throws IOException {
     // Create a RecordWriterManager
     RecordWriterManager writerManager = new RecordWriterManager(catalog, 
"test_file_name", 1000, 3);
 
-    // Write a row
-    Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build();
-    boolean writeSuccess = writerManager.write(windowedDestination, row);
-    assertTrue("Write operation should succeed", writeSuccess);
+    // Write multiple rows
+    Row row1 = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build();
+    Row row2 = Row.withSchema(BEAM_SCHEMA).addValues(2, "bbb", false).build();
+    Row row3 = Row.withSchema(BEAM_SCHEMA).addValues(3, "ccc", true).build();
+    assertTrue("Write operation should succeed", 
writerManager.write(windowedDestination, row1));
+    assertTrue("Write operation should succeed", 
writerManager.write(windowedDestination, row2));
+    assertTrue("Write operation should succeed", 
writerManager.write(windowedDestination, row3));
     writerManager.close();
 
     // Manually commit the data files to the Iceberg table
     Map<WindowedValue<IcebergDestination>, List<SerializableDataFile>> 
dataFiles =
         writerManager.getSerializableDataFiles();
     assertFalse("Data files should not be empty", dataFiles.isEmpty());
 
+    // Commit using the same table instance
     for (Map.Entry<WindowedValue<IcebergDestination>, 
List<SerializableDataFile>> entry :
         dataFiles.entrySet()) {
-      Table tableToCommit = 
catalog.loadTable(entry.getKey().getValue().getTableIdentifier());
-      AppendFiles appendFiles = tableToCommit.newAppend();
+      AppendFiles appendFiles = table.newAppend();
       for (SerializableDataFile dataFile : entry.getValue()) {
-        appendFiles.appendFile(dataFile.createDataFile(tableToCommit.specs()));
+        appendFiles.appendFile(dataFile.createDataFile(table.specs()));
       }
       appendFiles.commit();
-      tableToCommit.refresh();
     }
+    table.refresh(); // Single refresh after all commits
 
-    // Verify the correct table (where data was written)
-    Table dataTable = catalog.loadTable(TableIdentifier.of("default", 
"test_default_metrics"));
-    dataTable.refresh();
-
-    // Verify that a snapshot exists
-    Snapshot snapshot = dataTable.currentSnapshot();
+    // Verify using the same table
+    Snapshot snapshot = table.currentSnapshot();
     assertNotNull("Table should have a snapshot after writing data", snapshot);
 
-    // Verify metrics are collected for all columns
-    DataFile dataFile = 
snapshot.addedDataFiles(dataTable.io()).iterator().next();
+    // Verify metrics are collected for all columns in one file
+    DataFile dataFile = snapshot.addedDataFiles(table.io()).iterator().next();
     assertNotNull(dataFile.valueCounts());
     assertNotNull(dataFile.nullValueCounts());
     assertNotNull(dataFile.columnSizes());
+    assertNotNull(dataFile.lowerBounds());
+    assertNotNull(dataFile.upperBounds());
 
-    // Verify metrics are collected for all columns
     Map<Integer, Long> valueCounts = dataFile.valueCounts();
     Map<Integer, Long> nullValueCounts = dataFile.nullValueCounts();
     Map<Integer, Long> columnSizes = dataFile.columnSizes();
+    Map<Integer, ByteBuffer> lowerBounds = dataFile.lowerBounds();
+    Map<Integer, ByteBuffer> upperBounds = dataFile.upperBounds();
 
     for (int i = 1; i <= ICEBERG_SCHEMA.columns().size(); i++) {
       assertTrue("Value counts should be collected for column " + i, 
valueCounts.containsKey(i));
+      assertEquals(
+          "Value count for column " + i + " should be 1 in this file",
+          1L,
+          valueCounts.get(i).longValue());

Review Comment:
   See previous comment for this as well



##########
sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java:
##########
@@ -739,39 +740,45 @@ public void testColumnSpecificMetricsCollection() throws 
IOException {
 
     RecordWriterManager writerManager = new RecordWriterManager(catalog, 
"test_file_name", 1000, 3);
 
-    Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build();
-    boolean writeSuccess = writerManager.write(windowedDestination, row);
-    assertTrue("Write operation should succeed", writeSuccess);
+    // Write multiple rows
+    Row row1 = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build();
+    Row row2 = Row.withSchema(BEAM_SCHEMA).addValues(2, "bbb", false).build();
+    Row row3 = Row.withSchema(BEAM_SCHEMA).addValues(3, "ccc", true).build();
+    assertTrue("Write operation should succeed", 
writerManager.write(windowedDestination, row1));
+    assertTrue("Write operation should succeed", 
writerManager.write(windowedDestination, row2));
+    assertTrue("Write operation should succeed", 
writerManager.write(windowedDestination, row3));
     writerManager.close();
 
     Map<WindowedValue<IcebergDestination>, List<SerializableDataFile>> 
dataFiles =
         writerManager.getSerializableDataFiles();
     assertFalse("Data files should not be empty", dataFiles.isEmpty());
 
+    // Commit using the same table instance
     for (Map.Entry<WindowedValue<IcebergDestination>, 
List<SerializableDataFile>> entry :
         dataFiles.entrySet()) {
-      Table tableToCommit = 
catalog.loadTable(entry.getKey().getValue().getTableIdentifier());
-      AppendFiles appendFiles = tableToCommit.newAppend();
+      AppendFiles appendFiles = table.newAppend();
       for (SerializableDataFile dataFile : entry.getValue()) {
-        appendFiles.appendFile(dataFile.createDataFile(tableToCommit.specs()));
+        appendFiles.appendFile(dataFile.createDataFile(table.specs()));
       }
       appendFiles.commit();
-      tableToCommit.refresh();
     }
+    table.refresh(); // Single refresh after all commits
 
-    // Verify the correct table (where data was written)
-    Table dataTable =
-        catalog.loadTable(
-            TableIdentifier.of("default", 
"table_testColumnSpecificMetricsCollection"));
-    dataTable.refresh();
-
-    // Verify that a snapshot exists
-    Snapshot snapshot = dataTable.currentSnapshot();
+    // Verify using the same table
+    Snapshot snapshot = table.currentSnapshot();
     assertNotNull("Table should have a snapshot after writing data", snapshot);
 
-    // Verify metrics are collected only for specified columns
-    DataFile dataFile = 
snapshot.addedDataFiles(dataTable.io()).iterator().next();
-    assertNotNull(dataFile.valueCounts());
+    // Verify metrics are collected for specified columns in one file
+    DataFile dataFile = snapshot.addedDataFiles(table.io()).iterator().next();
+    Map<Integer, Long> valueCounts = dataFile.valueCounts();
+    assertNotNull("Value counts should not be null", valueCounts);
+    assertTrue("Value counts should exist for id (column 1)", 
valueCounts.containsKey(1));
+    assertEquals("Value count for id should be 1 in this file", 1L, 
valueCounts.get(1).longValue());
+    assertTrue("Value counts should exist for name (column 2)", 
valueCounts.containsKey(2));
+    assertEquals(
+        "Value count for name should be 1 in this file", 1L, 
valueCounts.get(2).longValue());

Review Comment:
   I think it's because we're creating the table with a partition spec above, 
so this test is actually writing 3 different data files, with one row in each.
   
   Can we remove the partition spec so we only have one data file?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to