gemini-code-assist[bot] commented on code in PR #35696:
URL: https://github.com/apache/beam/pull/35696#discussion_r2237857129


##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProviderIT.java:
##########
@@ -136,95 +136,197 @@
     tableAdminClient.close();
   }
 
-  public List<Row> writeToTable(int numRows) {
+  @Test

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The `testRead` and `testReadFlatten` methods contain a significant amount of 
duplicated code for writing test data to Bigtable. To improve test 
maintainability and readability, consider extracting this common setup logic 
into a private helper method. For example, a `writeTestRows(int numRows)` 
method could populate the Bigtable instance, and both tests could call it at 
the beginning.



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProviderIT.java:
##########
@@ -136,95 +136,197 @@ public void tearDown() {
     tableAdminClient.close();
   }
 
-  public List<Row> writeToTable(int numRows) {
+  @Test
+  public void testRead() {
+    int numRows = 20;
     List<Row> expectedRows = new ArrayList<>();
+    for (int i = 1; i <= numRows; i++) {
+      String key = "key" + i;
+      byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
+      String valueA = "value a" + i;
+      byte[] valueABytes = valueA.getBytes(StandardCharsets.UTF_8);
+      String valueB = "value b" + i;
+      byte[] valueBBytes = valueB.getBytes(StandardCharsets.UTF_8);
+      String valueC = "value c" + i;
+      byte[] valueCBytes = valueC.getBytes(StandardCharsets.UTF_8);
+      String valueD = "value d" + i;
+      byte[] valueDBytes = valueD.getBytes(StandardCharsets.UTF_8);
+      long timestamp = 1000L * i;
 
-    try {
-      for (int i = 1; i <= numRows; i++) {
-        String key = "key" + i;
-        String valueA = "value a" + i;
-        String valueB = "value b" + i;
-        String valueC = "value c" + i;
-        String valueD = "value d" + i;
-        long timestamp = 1000L * i;
-
-        RowMutation rowMutation =
-            RowMutation.create(tableId, key)
-                .setCell(COLUMN_FAMILY_NAME_1, "a", timestamp, valueA)
-                .setCell(COLUMN_FAMILY_NAME_1, "b", timestamp, valueB)
-                .setCell(COLUMN_FAMILY_NAME_2, "c", timestamp, valueC)
-                .setCell(COLUMN_FAMILY_NAME_2, "d", timestamp, valueD);
-        dataClient.mutateRow(rowMutation);
-
-        // Set up expected Beam Row
-        Map<String, List<Row>> columns1 = new HashMap<>();
-        columns1.put(
-            "a",
-            Arrays.asList(
-                Row.withSchema(CELL_SCHEMA)
-                    .withFieldValue(
-                        "value", 
ByteBuffer.wrap(valueA.getBytes(StandardCharsets.UTF_8)))
-                    .withFieldValue("timestamp_micros", timestamp)
-                    .build()));
-        columns1.put(
-            "b",
-            Arrays.asList(
-                Row.withSchema(CELL_SCHEMA)
-                    .withFieldValue(
-                        "value", 
ByteBuffer.wrap(valueB.getBytes(StandardCharsets.UTF_8)))
-                    .withFieldValue("timestamp_micros", timestamp)
-                    .build()));
-
-        Map<String, List<Row>> columns2 = new HashMap<>();
-        columns2.put(
-            "c",
-            Arrays.asList(
-                Row.withSchema(CELL_SCHEMA)
-                    .withFieldValue(
-                        "value", 
ByteBuffer.wrap(valueC.getBytes(StandardCharsets.UTF_8)))
-                    .withFieldValue("timestamp_micros", timestamp)
-                    .build()));
-        columns2.put(
-            "d",
-            Arrays.asList(
-                Row.withSchema(CELL_SCHEMA)
-                    .withFieldValue(
-                        "value", 
ByteBuffer.wrap(valueD.getBytes(StandardCharsets.UTF_8)))
-                    .withFieldValue("timestamp_micros", timestamp)
-                    .build()));
-
-        Map<String, Map<String, List<Row>>> families = new HashMap<>();
-        families.put(COLUMN_FAMILY_NAME_1, columns1);
-        families.put(COLUMN_FAMILY_NAME_2, columns2);
-
-        Row expectedRow =
-            Row.withSchema(ROW_SCHEMA)
-                .withFieldValue("key", 
ByteBuffer.wrap(key.getBytes(StandardCharsets.UTF_8)))
-                .withFieldValue("column_families", families)
-                .build();
-
-        expectedRows.add(expectedRow);
-      }
-      LOG.info("Finished writing {} rows to table {}", numRows, tableId);
-    } catch (NotFoundException e) {
-      throw new RuntimeException("Failed to write to table", e);
+      RowMutation rowMutation =
+          RowMutation.create(tableId, key)
+              .setCell(COLUMN_FAMILY_NAME_1, "a", timestamp, valueA)
+              .setCell(COLUMN_FAMILY_NAME_1, "b", timestamp, valueB)
+              .setCell(COLUMN_FAMILY_NAME_2, "c", timestamp, valueC)
+              .setCell(COLUMN_FAMILY_NAME_2, "d", timestamp, valueD);
+      dataClient.mutateRow(rowMutation);
+
+      // Set up expected Beam Row
+      // FIX: Use byte[] instead of ByteBuffer
+      Map<String, List<Row>> columns1 = new HashMap<>();
+      columns1.put(
+          "a",
+          Arrays.asList(
+              Row.withSchema(CELL_SCHEMA)
+                  .withFieldValue("value", valueABytes)
+                  .withFieldValue("timestamp_micros", timestamp)
+                  .build()));
+      columns1.put(
+          "b",
+          Arrays.asList(
+              Row.withSchema(CELL_SCHEMA)
+                  .withFieldValue("value", valueBBytes)
+                  .withFieldValue("timestamp_micros", timestamp)
+                  .build()));
+
+      Map<String, List<Row>> columns2 = new HashMap<>();
+      columns2.put(
+          "c",
+          Arrays.asList(
+              Row.withSchema(CELL_SCHEMA)
+                  .withFieldValue("value", valueCBytes)
+                  .withFieldValue("timestamp_micros", timestamp)
+                  .build()));
+      columns2.put(
+          "d",
+          Arrays.asList(
+              Row.withSchema(CELL_SCHEMA)
+                  .withFieldValue("value", valueDBytes)
+                  .withFieldValue("timestamp_micros", timestamp)
+                  .build()));
+
+      Map<String, Map<String, List<Row>>> families = new HashMap<>();
+      families.put(COLUMN_FAMILY_NAME_1, columns1);
+      families.put(COLUMN_FAMILY_NAME_2, columns2);
+
+      Row expectedRow =
+          Row.withSchema(ROW_SCHEMA)
+              .withFieldValue("key", keyBytes)
+              .withFieldValue("column_families", families)
+              .build();
+
+      expectedRows.add(expectedRow);
     }
-    return expectedRows;
+    LOG.info("Finished writing {} rows to table {}", numRows, tableId);
+
+    BigtableReadSchemaTransformConfiguration config =
+        BigtableReadSchemaTransformConfiguration.builder()
+            .setTableId(tableId)
+            .setInstanceId(instanceId)
+            .setProjectId(projectId)
+            .setFlatten(false)
+            .build();
+
+    SchemaTransform transform = new 
BigtableReadSchemaTransformProvider().from(config);
+
+    PCollection<Row> rows = 
PCollectionRowTuple.empty(p).apply(transform).get("output");
+
+    LOG.info("This is the rows: " + rows);
+
+    PAssert.that(rows).containsInAnyOrder(expectedRows);
+    p.run().waitUntilFinish();
   }
 
   @Test
-  public void testRead() {
-    List<Row> expectedRows = writeToTable(20);
+  public void testReadFlatten() {
+    int numRows = 20;
+    List<Row> expectedRows = new ArrayList<>();
+    for (int i = 1; i <= numRows; i++) {
+      String key = "key" + i;
+      byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
+      String valueA = "value a" + i;
+      byte[] valueABytes = valueA.getBytes(StandardCharsets.UTF_8);
+      String valueB = "value b" + i;
+      byte[] valueBBytes = valueB.getBytes(StandardCharsets.UTF_8);
+      String valueC = "value c" + i;
+      byte[] valueCBytes = valueC.getBytes(StandardCharsets.UTF_8);
+      String valueD = "value d" + i;
+      byte[] valueDBytes = valueD.getBytes(StandardCharsets.UTF_8);
+      long timestamp = 1000L * i;
 
+      // Write a row with four distinct columns to Bigtable
+      RowMutation rowMutation =
+          RowMutation.create(tableId, key)
+              .setCell(COLUMN_FAMILY_NAME_1, "a", timestamp, valueA)
+              .setCell(COLUMN_FAMILY_NAME_1, "b", timestamp, valueB)
+              .setCell(COLUMN_FAMILY_NAME_2, "c", timestamp, valueC)
+              .setCell(COLUMN_FAMILY_NAME_2, "d", timestamp, valueD);
+      dataClient.mutateRow(rowMutation);
+
+      // For each Bigtable row, we expect four flattened Beam Rows as output.
+      // Each Row corresponds to one column.
+      expectedRows.add(
+          Row.withSchema(FLATTENED_ROW_SCHEMA)
+              .withFieldValue("key", keyBytes)

Review Comment:
   ![critical](https://www.gstatic.com/codereviewagent/critical.svg)
   
   In this test, when constructing the expected flattened rows, you are using 
incorrect field names. The `FLATTENED_ROW_SCHEMA` defines the fields as 
`column_family` and `column_qualifier`, but the test uses `family` and 
`qualifier`. This will cause the test to fail.
   
   This occurs in four places when building the `expectedRows` list. For 
example, on lines 261-262, and is repeated on lines 275-276, 289-290, and 
303-304.
   
   Please update these to use the correct field names from the schema.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProvider.java:
##########
@@ -152,45 +165,87 @@ public PCollectionRowTuple expand(PCollectionRowTuple 
input) {
                       .withInstanceId(configuration.getInstanceId())
                       .withProjectId(configuration.getProjectId()));
 
+      Schema outputSchema =
+          Boolean.FALSE.equals(configuration.getFlatten()) ? ROW_SCHEMA : 
FLATTENED_ROW_SCHEMA;
+
       PCollection<Row> beamRows =
-          bigtableRows.apply(MapElements.via(new 
BigtableRowToBeamRow())).setRowSchema(ROW_SCHEMA);
+          bigtableRows
+              .apply("ConvertToBeamRows", ParDo.of(new 
BigtableRowConverterDoFn(configuration)))
+              .setRowSchema(outputSchema);
 
       return PCollectionRowTuple.of(OUTPUT_TAG, beamRows);
     }
   }
 
-  public static class BigtableRowToBeamRow extends 
SimpleFunction<com.google.bigtable.v2.Row, Row> {
-    @Override
-    public Row apply(com.google.bigtable.v2.Row bigtableRow) {
-      // The collection of families is represented as a Map of column families.
-      // Each column family is represented as a Map of columns.
-      // Each column is represented as a List of cells
-      // Each cell is represented as a Beam Row consisting of value and 
timestamp_micros
-      Map<String, Map<String, List<Row>>> families = new HashMap<>();
-
-      for (Family fam : bigtableRow.getFamiliesList()) {
-        // Map of column qualifier to list of cells
-        Map<String, List<Row>> columns = new HashMap<>();
-        for (Column col : fam.getColumnsList()) {
-          List<Row> cells = new ArrayList<>();
-          for (Cell cell : col.getCellsList()) {
-            Row cellRow =
-                Row.withSchema(CELL_SCHEMA)
-                    .withFieldValue("value", 
ByteBuffer.wrap(cell.getValue().toByteArray()))
-                    .withFieldValue("timestamp_micros", 
cell.getTimestampMicros())
+  /**
+   * A {@link DoFn} that converts a Bigtable {@link 
com.google.bigtable.v2.Row} to a Beam {@link
+   * Row}. It supports both a nested representation and a flattened 
representation where each column
+   * becomes a separate output element.
+   */
+  private static class BigtableRowConverterDoFn extends 
DoFn<com.google.bigtable.v2.Row, Row> {
+    private final BigtableReadSchemaTransformConfiguration configuration;
+
+    BigtableRowConverterDoFn(BigtableReadSchemaTransformConfiguration 
configuration) {
+      this.configuration = configuration;
+    }
+
+    @ProcessElement
+    public void processElement(
+        @Element com.google.bigtable.v2.Row bigtableRow, OutputReceiver<Row> 
out) {

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   There's some code duplication in the `if` and `else` blocks for converting a 
list of Bigtable `Cell`s to a list of Beam `Row`s. To improve maintainability 
and reduce redundancy, you could extract this logic into a private helper 
method.
   
   ```java
   private List<Row> convertCells(List<com.google.bigtable.v2.Cell> 
bigtableCells) {
       List<Row> beamCells = new ArrayList<>();
       for (com.google.bigtable.v2.Cell cell : bigtableCells) {
           Row cellRow =
               Row.withSchema(CELL_SCHEMA)
                   .withFieldValue("value", cell.getValue().toByteArray())
                   .withFieldValue("timestamp_micros", 
cell.getTimestampMicros())
                   .build();
           beamCells.add(cellRow);
       }
       return beamCells;
   }
   
   ```



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProviderIT.java:
##########
@@ -136,95 +136,197 @@
     tableAdminClient.close();
   }
 
-  public List<Row> writeToTable(int numRows) {
+  @Test
+  public void testRead() {
+    int numRows = 20;
     List<Row> expectedRows = new ArrayList<>();
+    for (int i = 1; i <= numRows; i++) {
+      String key = "key" + i;
+      byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
+      String valueA = "value a" + i;
+      byte[] valueABytes = valueA.getBytes(StandardCharsets.UTF_8);
+      String valueB = "value b" + i;
+      byte[] valueBBytes = valueB.getBytes(StandardCharsets.UTF_8);
+      String valueC = "value c" + i;
+      byte[] valueCBytes = valueC.getBytes(StandardCharsets.UTF_8);
+      String valueD = "value d" + i;
+      byte[] valueDBytes = valueD.getBytes(StandardCharsets.UTF_8);
+      long timestamp = 1000L * i;
 
-    try {
-      for (int i = 1; i <= numRows; i++) {
-        String key = "key" + i;
-        String valueA = "value a" + i;
-        String valueB = "value b" + i;
-        String valueC = "value c" + i;
-        String valueD = "value d" + i;
-        long timestamp = 1000L * i;
-
-        RowMutation rowMutation =
-            RowMutation.create(tableId, key)
-                .setCell(COLUMN_FAMILY_NAME_1, "a", timestamp, valueA)
-                .setCell(COLUMN_FAMILY_NAME_1, "b", timestamp, valueB)
-                .setCell(COLUMN_FAMILY_NAME_2, "c", timestamp, valueC)
-                .setCell(COLUMN_FAMILY_NAME_2, "d", timestamp, valueD);
-        dataClient.mutateRow(rowMutation);
-
-        // Set up expected Beam Row
-        Map<String, List<Row>> columns1 = new HashMap<>();
-        columns1.put(
-            "a",
-            Arrays.asList(
-                Row.withSchema(CELL_SCHEMA)
-                    .withFieldValue(
-                        "value", 
ByteBuffer.wrap(valueA.getBytes(StandardCharsets.UTF_8)))
-                    .withFieldValue("timestamp_micros", timestamp)
-                    .build()));
-        columns1.put(
-            "b",
-            Arrays.asList(
-                Row.withSchema(CELL_SCHEMA)
-                    .withFieldValue(
-                        "value", 
ByteBuffer.wrap(valueB.getBytes(StandardCharsets.UTF_8)))
-                    .withFieldValue("timestamp_micros", timestamp)
-                    .build()));
-
-        Map<String, List<Row>> columns2 = new HashMap<>();
-        columns2.put(
-            "c",
-            Arrays.asList(
-                Row.withSchema(CELL_SCHEMA)
-                    .withFieldValue(
-                        "value", 
ByteBuffer.wrap(valueC.getBytes(StandardCharsets.UTF_8)))
-                    .withFieldValue("timestamp_micros", timestamp)
-                    .build()));
-        columns2.put(
-            "d",
-            Arrays.asList(
-                Row.withSchema(CELL_SCHEMA)
-                    .withFieldValue(
-                        "value", 
ByteBuffer.wrap(valueD.getBytes(StandardCharsets.UTF_8)))
-                    .withFieldValue("timestamp_micros", timestamp)
-                    .build()));
-
-        Map<String, Map<String, List<Row>>> families = new HashMap<>();
-        families.put(COLUMN_FAMILY_NAME_1, columns1);
-        families.put(COLUMN_FAMILY_NAME_2, columns2);
-
-        Row expectedRow =
-            Row.withSchema(ROW_SCHEMA)
-                .withFieldValue("key", 
ByteBuffer.wrap(key.getBytes(StandardCharsets.UTF_8)))
-                .withFieldValue("column_families", families)
-                .build();
-
-        expectedRows.add(expectedRow);
-      }
-      LOG.info("Finished writing {} rows to table {}", numRows, tableId);
-    } catch (NotFoundException e) {
-      throw new RuntimeException("Failed to write to table", e);
+      RowMutation rowMutation =
+          RowMutation.create(tableId, key)
+              .setCell(COLUMN_FAMILY_NAME_1, "a", timestamp, valueA)
+              .setCell(COLUMN_FAMILY_NAME_1, "b", timestamp, valueB)
+              .setCell(COLUMN_FAMILY_NAME_2, "c", timestamp, valueC)
+              .setCell(COLUMN_FAMILY_NAME_2, "d", timestamp, valueD);
+      dataClient.mutateRow(rowMutation);
+
+      // Set up expected Beam Row
+      // FIX: Use byte[] instead of ByteBuffer
+      Map<String, List<Row>> columns1 = new HashMap<>();
+      columns1.put(
+          "a",
+          Arrays.asList(
+              Row.withSchema(CELL_SCHEMA)
+                  .withFieldValue("value", valueABytes)
+                  .withFieldValue("timestamp_micros", timestamp)
+                  .build()));
+      columns1.put(
+          "b",
+          Arrays.asList(
+              Row.withSchema(CELL_SCHEMA)
+                  .withFieldValue("value", valueBBytes)
+                  .withFieldValue("timestamp_micros", timestamp)
+                  .build()));
+
+      Map<String, List<Row>> columns2 = new HashMap<>();
+      columns2.put(
+          "c",
+          Arrays.asList(
+              Row.withSchema(CELL_SCHEMA)
+                  .withFieldValue("value", valueCBytes)
+                  .withFieldValue("timestamp_micros", timestamp)
+                  .build()));
+      columns2.put(
+          "d",
+          Arrays.asList(
+              Row.withSchema(CELL_SCHEMA)
+                  .withFieldValue("value", valueDBytes)
+                  .withFieldValue("timestamp_micros", timestamp)
+                  .build()));
+
+      Map<String, Map<String, List<Row>>> families = new HashMap<>();
+      families.put(COLUMN_FAMILY_NAME_1, columns1);
+      families.put(COLUMN_FAMILY_NAME_2, columns2);
+
+      Row expectedRow =
+          Row.withSchema(ROW_SCHEMA)
+              .withFieldValue("key", keyBytes)
+              .withFieldValue("column_families", families)
+              .build();
+
+      expectedRows.add(expectedRow);
     }
-    return expectedRows;
+    LOG.info("Finished writing {} rows to table {}", numRows, tableId);
+
+    BigtableReadSchemaTransformConfiguration config =
+        BigtableReadSchemaTransformConfiguration.builder()
+            .setTableId(tableId)
+            .setInstanceId(instanceId)
+            .setProjectId(projectId)
+            .setFlatten(false)
+            .build();
+
+    SchemaTransform transform = new 
BigtableReadSchemaTransformProvider().from(config);
+
+    PCollection<Row> rows = 
PCollectionRowTuple.empty(p).apply(transform).get("output");
+
+    LOG.info("This is the rows: " + rows);

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   This log statement appears to be for debugging purposes. It should be 
removed from the final test code to keep the test output clean.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to