ahmedabu98 commented on code in PR #34140:
URL: https://github.com/apache/beam/pull/34140#discussion_r1980242838


##########
sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java:
##########
@@ -719,4 +725,113 @@ public void close() throws IOException {
       throw new IOException("I am failing!");
     }
   }
+
+  @Test
+  public void testColumnSpecificMetricsCollection() throws IOException {
+    // Set up table with column-specific metrics enabled
+    TableIdentifier tableId = TableIdentifier.of("default", 
"test_column_metrics");
+    Table table = warehouse.createTable(tableId, ICEBERG_SCHEMA, 
PARTITION_SPEC);
+    table
+        .updateProperties()
+        .set("write.metadata.metrics.column.id", "counts")
+        .set("write.metadata.metrics.column.name", "counts")
+        .commit();
+
+    RecordWriterManager writerManager = new RecordWriterManager(catalog, 
"test_file_name", 1000, 3);
+
+    Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build();
+    boolean writeSuccess = writerManager.write(windowedDestination, row);
+    assertTrue("Write operation should succeed", writeSuccess);
+    writerManager.close();
+
+    Map<WindowedValue<IcebergDestination>, List<SerializableDataFile>> 
dataFiles =
+        writerManager.getSerializableDataFiles();
+    assertFalse("Data files should not be empty", dataFiles.isEmpty());
+
+    for (Map.Entry<WindowedValue<IcebergDestination>, 
List<SerializableDataFile>> entry :
+        dataFiles.entrySet()) {
+      Table tableToCommit = 
catalog.loadTable(entry.getKey().getValue().getTableIdentifier());

Review Comment:
   Btw we can continue using the same `table` instance above -- we don't have 
to loading it again or refresh



##########
sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java:
##########
@@ -719,4 +725,113 @@ public void close() throws IOException {
       throw new IOException("I am failing!");
     }
   }
+
+  @Test
+  public void testColumnSpecificMetricsCollection() throws IOException {
+    // Set up table with column-specific metrics enabled
+    TableIdentifier tableId = TableIdentifier.of("default", 
"test_column_metrics");
+    Table table = warehouse.createTable(tableId, ICEBERG_SCHEMA, 
PARTITION_SPEC);
+    table
+        .updateProperties()
+        .set("write.metadata.metrics.column.id", "counts")
+        .set("write.metadata.metrics.column.name", "counts")
+        .commit();
+
+    RecordWriterManager writerManager = new RecordWriterManager(catalog, 
"test_file_name", 1000, 3);
+
+    Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build();
+    boolean writeSuccess = writerManager.write(windowedDestination, row);
+    assertTrue("Write operation should succeed", writeSuccess);
+    writerManager.close();
+
+    Map<WindowedValue<IcebergDestination>, List<SerializableDataFile>> 
dataFiles =
+        writerManager.getSerializableDataFiles();
+    assertFalse("Data files should not be empty", dataFiles.isEmpty());
+
+    for (Map.Entry<WindowedValue<IcebergDestination>, 
List<SerializableDataFile>> entry :
+        dataFiles.entrySet()) {
+      Table tableToCommit = 
catalog.loadTable(entry.getKey().getValue().getTableIdentifier());
+      AppendFiles appendFiles = tableToCommit.newAppend();
+      for (SerializableDataFile dataFile : entry.getValue()) {
+        appendFiles.appendFile(dataFile.createDataFile(tableToCommit.specs()));
+      }
+      appendFiles.commit();
+      tableToCommit.refresh();
+    }
+
+    // Verify the correct table (where data was written)
+    Table dataTable =
+        catalog.loadTable(
+            TableIdentifier.of("default", 
"table_testColumnSpecificMetricsCollection"));
+    dataTable.refresh();
+
+    // Verify that a snapshot exists
+    Snapshot snapshot = dataTable.currentSnapshot();
+    assertNotNull("Table should have a snapshot after writing data", snapshot);
+
+    // Verify metrics are collected only for specified columns
+    DataFile dataFile = 
snapshot.addedDataFiles(dataTable.io()).iterator().next();
+    assertNotNull(dataFile.valueCounts());
+    assertNotNull(dataFile.nullValueCounts());
+    assertNotNull(dataFile.columnSizes());
+  }
+
+  @Test
+  public void testDefaultMetrics() throws IOException {
+    // Set up table with default metrics enabled
+    TableIdentifier tableId = TableIdentifier.of("default", 
"test_default_metrics");
+    Table table = warehouse.createTable(tableId, ICEBERG_SCHEMA, 
PARTITION_SPEC);
+    table.updateProperties().set("write.metadata.metrics.default", 
"full").commit();
+
+    // Create a RecordWriterManager
+    RecordWriterManager writerManager = new RecordWriterManager(catalog, 
"test_file_name", 1000, 3);
+
+    // Write a row
+    Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build();
+    boolean writeSuccess = writerManager.write(windowedDestination, row);
+    assertTrue("Write operation should succeed", writeSuccess);
+    writerManager.close();
+
+    // Manually commit the data files to the Iceberg table
+    Map<WindowedValue<IcebergDestination>, List<SerializableDataFile>> 
dataFiles =
+        writerManager.getSerializableDataFiles();
+    assertFalse("Data files should not be empty", dataFiles.isEmpty());
+
+    for (Map.Entry<WindowedValue<IcebergDestination>, 
List<SerializableDataFile>> entry :
+        dataFiles.entrySet()) {
+      Table tableToCommit = 
catalog.loadTable(entry.getKey().getValue().getTableIdentifier());
+      AppendFiles appendFiles = tableToCommit.newAppend();
+      for (SerializableDataFile dataFile : entry.getValue()) {
+        appendFiles.appendFile(dataFile.createDataFile(tableToCommit.specs()));
+      }
+      appendFiles.commit();
+      tableToCommit.refresh();
+    }
+
+    // Verify the correct table (where data was written)
+    Table dataTable = catalog.loadTable(TableIdentifier.of("default", 
"test_default_metrics"));
+    dataTable.refresh();
+
+    // Verify that a snapshot exists
+    Snapshot snapshot = dataTable.currentSnapshot();
+    assertNotNull("Table should have a snapshot after writing data", snapshot);
+
+    // Verify metrics are collected for all columns
+    DataFile dataFile = 
snapshot.addedDataFiles(dataTable.io()).iterator().next();
+    assertNotNull(dataFile.valueCounts());
+    assertNotNull(dataFile.nullValueCounts());
+    assertNotNull(dataFile.columnSizes());
+
+    // Verify metrics are collected for all columns
+    Map<Integer, Long> valueCounts = dataFile.valueCounts();
+    Map<Integer, Long> nullValueCounts = dataFile.nullValueCounts();
+    Map<Integer, Long> columnSizes = dataFile.columnSizes();
+
+    for (int i = 1; i <= ICEBERG_SCHEMA.columns().size(); i++) {
+      assertTrue("Value counts should be collected for column " + i, 
valueCounts.containsKey(i));

Review Comment:
   we can also check the value of valueCounts.get(i) 



##########
sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java:
##########
@@ -719,4 +725,113 @@ public void close() throws IOException {
       throw new IOException("I am failing!");
     }
   }
+
+  @Test
+  public void testColumnSpecificMetricsCollection() throws IOException {
+    // Set up table with column-specific metrics enabled
+    TableIdentifier tableId = TableIdentifier.of("default", 
"test_column_metrics");
+    Table table = warehouse.createTable(tableId, ICEBERG_SCHEMA, 
PARTITION_SPEC);
+    table
+        .updateProperties()
+        .set("write.metadata.metrics.column.id", "counts")
+        .set("write.metadata.metrics.column.name", "counts")
+        .commit();
+
+    RecordWriterManager writerManager = new RecordWriterManager(catalog, 
"test_file_name", 1000, 3);
+
+    Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build();
+    boolean writeSuccess = writerManager.write(windowedDestination, row);
+    assertTrue("Write operation should succeed", writeSuccess);
+    writerManager.close();
+
+    Map<WindowedValue<IcebergDestination>, List<SerializableDataFile>> 
dataFiles =
+        writerManager.getSerializableDataFiles();
+    assertFalse("Data files should not be empty", dataFiles.isEmpty());
+
+    for (Map.Entry<WindowedValue<IcebergDestination>, 
List<SerializableDataFile>> entry :
+        dataFiles.entrySet()) {
+      Table tableToCommit = 
catalog.loadTable(entry.getKey().getValue().getTableIdentifier());
+      AppendFiles appendFiles = tableToCommit.newAppend();
+      for (SerializableDataFile dataFile : entry.getValue()) {
+        appendFiles.appendFile(dataFile.createDataFile(tableToCommit.specs()));
+      }
+      appendFiles.commit();
+      tableToCommit.refresh();
+    }
+
+    // Verify the correct table (where data was written)
+    Table dataTable =
+        catalog.loadTable(

Review Comment:
   similar to previous comment, just use the same `table` 



##########
sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java:
##########
@@ -719,4 +725,113 @@ public void close() throws IOException {
       throw new IOException("I am failing!");
     }
   }
+
+  @Test
+  public void testColumnSpecificMetricsCollection() throws IOException {
+    // Set up table with column-specific metrics enabled
+    TableIdentifier tableId = TableIdentifier.of("default", 
"test_column_metrics");
+    Table table = warehouse.createTable(tableId, ICEBERG_SCHEMA, 
PARTITION_SPEC);
+    table
+        .updateProperties()
+        .set("write.metadata.metrics.column.id", "counts")
+        .set("write.metadata.metrics.column.name", "counts")
+        .commit();
+
+    RecordWriterManager writerManager = new RecordWriterManager(catalog, 
"test_file_name", 1000, 3);
+
+    Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build();
+    boolean writeSuccess = writerManager.write(windowedDestination, row);
+    assertTrue("Write operation should succeed", writeSuccess);
+    writerManager.close();
+
+    Map<WindowedValue<IcebergDestination>, List<SerializableDataFile>> 
dataFiles =
+        writerManager.getSerializableDataFiles();
+    assertFalse("Data files should not be empty", dataFiles.isEmpty());
+
+    for (Map.Entry<WindowedValue<IcebergDestination>, 
List<SerializableDataFile>> entry :
+        dataFiles.entrySet()) {
+      Table tableToCommit = 
catalog.loadTable(entry.getKey().getValue().getTableIdentifier());
+      AppendFiles appendFiles = tableToCommit.newAppend();
+      for (SerializableDataFile dataFile : entry.getValue()) {
+        appendFiles.appendFile(dataFile.createDataFile(tableToCommit.specs()));
+      }
+      appendFiles.commit();
+      tableToCommit.refresh();
+    }
+
+    // Verify the correct table (where data was written)
+    Table dataTable =
+        catalog.loadTable(
+            TableIdentifier.of("default", 
"table_testColumnSpecificMetricsCollection"));
+    dataTable.refresh();
+
+    // Verify that a snapshot exists
+    Snapshot snapshot = dataTable.currentSnapshot();
+    assertNotNull("Table should have a snapshot after writing data", snapshot);
+
+    // Verify metrics are collected only for specified columns
+    DataFile dataFile = 
snapshot.addedDataFiles(dataTable.io()).iterator().next();
+    assertNotNull(dataFile.valueCounts());

Review Comment:
   can we also check the count for each column exists and is correct?



##########
sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java:
##########
@@ -719,4 +725,113 @@ public void close() throws IOException {
       throw new IOException("I am failing!");
     }
   }
+
+  @Test
+  public void testColumnSpecificMetricsCollection() throws IOException {
+    // Set up table with column-specific metrics enabled
+    TableIdentifier tableId = TableIdentifier.of("default", 
"test_column_metrics");
+    Table table = warehouse.createTable(tableId, ICEBERG_SCHEMA, 
PARTITION_SPEC);
+    table
+        .updateProperties()
+        .set("write.metadata.metrics.column.id", "counts")
+        .set("write.metadata.metrics.column.name", "counts")
+        .commit();
+
+    RecordWriterManager writerManager = new RecordWriterManager(catalog, 
"test_file_name", 1000, 3);
+
+    Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build();
+    boolean writeSuccess = writerManager.write(windowedDestination, row);
+    assertTrue("Write operation should succeed", writeSuccess);
+    writerManager.close();
+
+    Map<WindowedValue<IcebergDestination>, List<SerializableDataFile>> 
dataFiles =
+        writerManager.getSerializableDataFiles();
+    assertFalse("Data files should not be empty", dataFiles.isEmpty());
+
+    for (Map.Entry<WindowedValue<IcebergDestination>, 
List<SerializableDataFile>> entry :
+        dataFiles.entrySet()) {
+      Table tableToCommit = 
catalog.loadTable(entry.getKey().getValue().getTableIdentifier());
+      AppendFiles appendFiles = tableToCommit.newAppend();
+      for (SerializableDataFile dataFile : entry.getValue()) {
+        appendFiles.appendFile(dataFile.createDataFile(tableToCommit.specs()));
+      }
+      appendFiles.commit();
+      tableToCommit.refresh();
+    }
+
+    // Verify the correct table (where data was written)
+    Table dataTable =
+        catalog.loadTable(
+            TableIdentifier.of("default", 
"table_testColumnSpecificMetricsCollection"));
+    dataTable.refresh();
+
+    // Verify that a snapshot exists
+    Snapshot snapshot = dataTable.currentSnapshot();
+    assertNotNull("Table should have a snapshot after writing data", snapshot);
+
+    // Verify metrics are collected only for specified columns
+    DataFile dataFile = 
snapshot.addedDataFiles(dataTable.io()).iterator().next();
+    assertNotNull(dataFile.valueCounts());
+    assertNotNull(dataFile.nullValueCounts());
+    assertNotNull(dataFile.columnSizes());
+  }
+
+  @Test
+  public void testDefaultMetrics() throws IOException {
+    // Set up table with default metrics enabled
+    TableIdentifier tableId = TableIdentifier.of("default", 
"test_default_metrics");
+    Table table = warehouse.createTable(tableId, ICEBERG_SCHEMA, 
PARTITION_SPEC);
+    table.updateProperties().set("write.metadata.metrics.default", 
"full").commit();
+
+    // Create a RecordWriterManager
+    RecordWriterManager writerManager = new RecordWriterManager(catalog, 
"test_file_name", 1000, 3);
+
+    // Write a row
+    Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build();
+    boolean writeSuccess = writerManager.write(windowedDestination, row);
+    assertTrue("Write operation should succeed", writeSuccess);
+    writerManager.close();
+
+    // Manually commit the data files to the Iceberg table
+    Map<WindowedValue<IcebergDestination>, List<SerializableDataFile>> 
dataFiles =
+        writerManager.getSerializableDataFiles();
+    assertFalse("Data files should not be empty", dataFiles.isEmpty());
+
+    for (Map.Entry<WindowedValue<IcebergDestination>, 
List<SerializableDataFile>> entry :
+        dataFiles.entrySet()) {
+      Table tableToCommit = 
catalog.loadTable(entry.getKey().getValue().getTableIdentifier());
+      AppendFiles appendFiles = tableToCommit.newAppend();
+      for (SerializableDataFile dataFile : entry.getValue()) {
+        appendFiles.appendFile(dataFile.createDataFile(tableToCommit.specs()));
+      }
+      appendFiles.commit();
+      tableToCommit.refresh();
+    }
+
+    // Verify the correct table (where data was written)
+    Table dataTable = catalog.loadTable(TableIdentifier.of("default", 
"test_default_metrics"));
+    dataTable.refresh();
+
+    // Verify that a snapshot exists
+    Snapshot snapshot = dataTable.currentSnapshot();
+    assertNotNull("Table should have a snapshot after writing data", snapshot);
+
+    // Verify metrics are collected for all columns
+    DataFile dataFile = 
snapshot.addedDataFiles(dataTable.io()).iterator().next();
+    assertNotNull(dataFile.valueCounts());
+    assertNotNull(dataFile.nullValueCounts());
+    assertNotNull(dataFile.columnSizes());
+
+    // Verify metrics are collected for all columns
+    Map<Integer, Long> valueCounts = dataFile.valueCounts();
+    Map<Integer, Long> nullValueCounts = dataFile.nullValueCounts();
+    Map<Integer, Long> columnSizes = dataFile.columnSizes();

Review Comment:
   also check `.lowerBounds()` and `.upperBounds()` and their values



##########
sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java:
##########
@@ -719,4 +725,113 @@ public void close() throws IOException {
       throw new IOException("I am failing!");
     }
   }
+
+  @Test
+  public void testColumnSpecificMetricsCollection() throws IOException {
+    // Set up table with column-specific metrics enabled
+    TableIdentifier tableId = TableIdentifier.of("default", 
"test_column_metrics");
+    Table table = warehouse.createTable(tableId, ICEBERG_SCHEMA, 
PARTITION_SPEC);
+    table
+        .updateProperties()
+        .set("write.metadata.metrics.column.id", "counts")
+        .set("write.metadata.metrics.column.name", "counts")
+        .commit();
+
+    RecordWriterManager writerManager = new RecordWriterManager(catalog, 
"test_file_name", 1000, 3);
+
+    Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build();
+    boolean writeSuccess = writerManager.write(windowedDestination, row);
+    assertTrue("Write operation should succeed", writeSuccess);
+    writerManager.close();
+
+    Map<WindowedValue<IcebergDestination>, List<SerializableDataFile>> 
dataFiles =
+        writerManager.getSerializableDataFiles();
+    assertFalse("Data files should not be empty", dataFiles.isEmpty());
+
+    for (Map.Entry<WindowedValue<IcebergDestination>, 
List<SerializableDataFile>> entry :
+        dataFiles.entrySet()) {
+      Table tableToCommit = 
catalog.loadTable(entry.getKey().getValue().getTableIdentifier());
+      AppendFiles appendFiles = tableToCommit.newAppend();
+      for (SerializableDataFile dataFile : entry.getValue()) {
+        appendFiles.appendFile(dataFile.createDataFile(tableToCommit.specs()));
+      }
+      appendFiles.commit();
+      tableToCommit.refresh();
+    }
+
+    // Verify the correct table (where data was written)
+    Table dataTable =
+        catalog.loadTable(
+            TableIdentifier.of("default", 
"table_testColumnSpecificMetricsCollection"));
+    dataTable.refresh();
+
+    // Verify that a snapshot exists
+    Snapshot snapshot = dataTable.currentSnapshot();
+    assertNotNull("Table should have a snapshot after writing data", snapshot);
+
+    // Verify metrics are collected only for specified columns
+    DataFile dataFile = 
snapshot.addedDataFiles(dataTable.io()).iterator().next();
+    assertNotNull(dataFile.valueCounts());
+    assertNotNull(dataFile.nullValueCounts());
+    assertNotNull(dataFile.columnSizes());
+  }
+
+  @Test
+  public void testDefaultMetrics() throws IOException {
+    // Set up table with default metrics enabled
+    TableIdentifier tableId = TableIdentifier.of("default", 
"test_default_metrics");
+    Table table = warehouse.createTable(tableId, ICEBERG_SCHEMA, 
PARTITION_SPEC);
+    table.updateProperties().set("write.metadata.metrics.default", 
"full").commit();
+
+    // Create a RecordWriterManager
+    RecordWriterManager writerManager = new RecordWriterManager(catalog, 
"test_file_name", 1000, 3);
+
+    // Write a row
+    Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build();

Review Comment:
   for both of these tests, can we write multiple rows for good measure?



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java:
##########
@@ -80,6 +84,7 @@ class RecordWriter {
                 .schema(table.schema())
                 .withSpec(table.spec())
                 .withPartition(partitionKey)
+                .metricsConfig(metricsConfig) // Pass the MetricsConfig

Review Comment:
   nit: the comment is unnecessary



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to