gemini-code-assist[bot] commented on code in PR #35696: URL: https://github.com/apache/beam/pull/35696#discussion_r2237857129
########## sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProviderIT.java: ########## @@ -136,95 +136,197 @@ tableAdminClient.close(); } - public List<Row> writeToTable(int numRows) { + @Test Review Comment:  The `testRead` and `testReadFlatten` methods contain a significant amount of duplicated code for writing test data to Bigtable. To improve test maintainability and readability, consider extracting this common setup logic into a private helper method. For example, a `writeTestRows(int numRows)` method could populate the Bigtable instance, and both tests could call it at the beginning. ########## sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProviderIT.java: ########## @@ -136,95 +136,197 @@ public void tearDown() { tableAdminClient.close(); } - public List<Row> writeToTable(int numRows) { + @Test + public void testRead() { + int numRows = 20; List<Row> expectedRows = new ArrayList<>(); + for (int i = 1; i <= numRows; i++) { + String key = "key" + i; + byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8); + String valueA = "value a" + i; + byte[] valueABytes = valueA.getBytes(StandardCharsets.UTF_8); + String valueB = "value b" + i; + byte[] valueBBytes = valueB.getBytes(StandardCharsets.UTF_8); + String valueC = "value c" + i; + byte[] valueCBytes = valueC.getBytes(StandardCharsets.UTF_8); + String valueD = "value d" + i; + byte[] valueDBytes = valueD.getBytes(StandardCharsets.UTF_8); + long timestamp = 1000L * i; - try { - for (int i = 1; i <= numRows; i++) { - String key = "key" + i; - String valueA = "value a" + i; - String valueB = "value b" + i; - String valueC = "value c" + i; - String valueD = "value d" + i; - long timestamp = 1000L * i; - - RowMutation rowMutation = - RowMutation.create(tableId, key) - .setCell(COLUMN_FAMILY_NAME_1, "a", timestamp, valueA) - .setCell(COLUMN_FAMILY_NAME_1, "b", timestamp, valueB) - .setCell(COLUMN_FAMILY_NAME_2, "c", timestamp, valueC) - .setCell(COLUMN_FAMILY_NAME_2, "d", timestamp, valueD); - dataClient.mutateRow(rowMutation); - - // Set up expected Beam Row - Map<String, List<Row>> columns1 = new HashMap<>(); - columns1.put( - "a", - Arrays.asList( - Row.withSchema(CELL_SCHEMA) - .withFieldValue( - "value", ByteBuffer.wrap(valueA.getBytes(StandardCharsets.UTF_8))) - .withFieldValue("timestamp_micros", timestamp) - .build())); - columns1.put( - "b", - Arrays.asList( - Row.withSchema(CELL_SCHEMA) - .withFieldValue( - "value", ByteBuffer.wrap(valueB.getBytes(StandardCharsets.UTF_8))) - .withFieldValue("timestamp_micros", timestamp) - .build())); - - Map<String, List<Row>> columns2 = new HashMap<>(); - columns2.put( - "c", - Arrays.asList( - Row.withSchema(CELL_SCHEMA) - .withFieldValue( - "value", ByteBuffer.wrap(valueC.getBytes(StandardCharsets.UTF_8))) - .withFieldValue("timestamp_micros", timestamp) - .build())); - columns2.put( - "d", - Arrays.asList( - Row.withSchema(CELL_SCHEMA) - .withFieldValue( - "value", ByteBuffer.wrap(valueD.getBytes(StandardCharsets.UTF_8))) - .withFieldValue("timestamp_micros", timestamp) - .build())); - - Map<String, Map<String, List<Row>>> families = new HashMap<>(); - families.put(COLUMN_FAMILY_NAME_1, columns1); - families.put(COLUMN_FAMILY_NAME_2, columns2); - - Row expectedRow = - Row.withSchema(ROW_SCHEMA) - .withFieldValue("key", ByteBuffer.wrap(key.getBytes(StandardCharsets.UTF_8))) - .withFieldValue("column_families", families) - .build(); - - expectedRows.add(expectedRow); - } - LOG.info("Finished writing {} rows to table {}", numRows, tableId); - } catch (NotFoundException e) { - throw new RuntimeException("Failed to write to table", e); + RowMutation rowMutation = + RowMutation.create(tableId, key) + .setCell(COLUMN_FAMILY_NAME_1, "a", timestamp, valueA) + .setCell(COLUMN_FAMILY_NAME_1, "b", timestamp, valueB) + .setCell(COLUMN_FAMILY_NAME_2, "c", timestamp, valueC) + .setCell(COLUMN_FAMILY_NAME_2, "d", timestamp, valueD); + dataClient.mutateRow(rowMutation); + + // Set up expected Beam Row + // FIX: Use byte[] instead of ByteBuffer + Map<String, List<Row>> columns1 = new HashMap<>(); + columns1.put( + "a", + Arrays.asList( + Row.withSchema(CELL_SCHEMA) + .withFieldValue("value", valueABytes) + .withFieldValue("timestamp_micros", timestamp) + .build())); + columns1.put( + "b", + Arrays.asList( + Row.withSchema(CELL_SCHEMA) + .withFieldValue("value", valueBBytes) + .withFieldValue("timestamp_micros", timestamp) + .build())); + + Map<String, List<Row>> columns2 = new HashMap<>(); + columns2.put( + "c", + Arrays.asList( + Row.withSchema(CELL_SCHEMA) + .withFieldValue("value", valueCBytes) + .withFieldValue("timestamp_micros", timestamp) + .build())); + columns2.put( + "d", + Arrays.asList( + Row.withSchema(CELL_SCHEMA) + .withFieldValue("value", valueDBytes) + .withFieldValue("timestamp_micros", timestamp) + .build())); + + Map<String, Map<String, List<Row>>> families = new HashMap<>(); + families.put(COLUMN_FAMILY_NAME_1, columns1); + families.put(COLUMN_FAMILY_NAME_2, columns2); + + Row expectedRow = + Row.withSchema(ROW_SCHEMA) + .withFieldValue("key", keyBytes) + .withFieldValue("column_families", families) + .build(); + + expectedRows.add(expectedRow); } - return expectedRows; + LOG.info("Finished writing {} rows to table {}", numRows, tableId); + + BigtableReadSchemaTransformConfiguration config = + BigtableReadSchemaTransformConfiguration.builder() + .setTableId(tableId) + .setInstanceId(instanceId) + .setProjectId(projectId) + .setFlatten(false) + .build(); + + SchemaTransform transform = new BigtableReadSchemaTransformProvider().from(config); + + PCollection<Row> rows = PCollectionRowTuple.empty(p).apply(transform).get("output"); + + LOG.info("This is the rows: " + rows); + + PAssert.that(rows).containsInAnyOrder(expectedRows); + p.run().waitUntilFinish(); } @Test - public void testRead() { - List<Row> expectedRows = writeToTable(20); + public void testReadFlatten() { + int numRows = 20; + List<Row> expectedRows = new ArrayList<>(); + for (int i = 1; i <= numRows; i++) { + String key = "key" + i; + byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8); + String valueA = "value a" + i; + byte[] valueABytes = valueA.getBytes(StandardCharsets.UTF_8); + String valueB = "value b" + i; + byte[] valueBBytes = valueB.getBytes(StandardCharsets.UTF_8); + String valueC = "value c" + i; + byte[] valueCBytes = valueC.getBytes(StandardCharsets.UTF_8); + String valueD = "value d" + i; + byte[] valueDBytes = valueD.getBytes(StandardCharsets.UTF_8); + long timestamp = 1000L * i; + // Write a row with four distinct columns to Bigtable + RowMutation rowMutation = + RowMutation.create(tableId, key) + .setCell(COLUMN_FAMILY_NAME_1, "a", timestamp, valueA) + .setCell(COLUMN_FAMILY_NAME_1, "b", timestamp, valueB) + .setCell(COLUMN_FAMILY_NAME_2, "c", timestamp, valueC) + .setCell(COLUMN_FAMILY_NAME_2, "d", timestamp, valueD); + dataClient.mutateRow(rowMutation); + + // For each Bigtable row, we expect four flattened Beam Rows as output. + // Each Row corresponds to one column. + expectedRows.add( + Row.withSchema(FLATTENED_ROW_SCHEMA) + .withFieldValue("key", keyBytes) Review Comment:  In this test, when constructing the expected flattened rows, you are using incorrect field names. The `FLATTENED_ROW_SCHEMA` defines the fields as `column_family` and `column_qualifier`, but the test uses `family` and `qualifier`. This will cause the test to fail. This occurs in four places when building the `expectedRows` list. For example, on lines 261-262, and is repeated on lines 275-276, 289-290, and 303-304. Please update these to use the correct field names from the schema. ########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProvider.java: ########## @@ -152,45 +165,87 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { .withInstanceId(configuration.getInstanceId()) .withProjectId(configuration.getProjectId())); + Schema outputSchema = + Boolean.FALSE.equals(configuration.getFlatten()) ? ROW_SCHEMA : FLATTENED_ROW_SCHEMA; + PCollection<Row> beamRows = - bigtableRows.apply(MapElements.via(new BigtableRowToBeamRow())).setRowSchema(ROW_SCHEMA); + bigtableRows + .apply("ConvertToBeamRows", ParDo.of(new BigtableRowConverterDoFn(configuration))) + .setRowSchema(outputSchema); return PCollectionRowTuple.of(OUTPUT_TAG, beamRows); } } - public static class BigtableRowToBeamRow extends SimpleFunction<com.google.bigtable.v2.Row, Row> { - @Override - public Row apply(com.google.bigtable.v2.Row bigtableRow) { - // The collection of families is represented as a Map of column families. - // Each column family is represented as a Map of columns. - // Each column is represented as a List of cells - // Each cell is represented as a Beam Row consisting of value and timestamp_micros - Map<String, Map<String, List<Row>>> families = new HashMap<>(); - - for (Family fam : bigtableRow.getFamiliesList()) { - // Map of column qualifier to list of cells - Map<String, List<Row>> columns = new HashMap<>(); - for (Column col : fam.getColumnsList()) { - List<Row> cells = new ArrayList<>(); - for (Cell cell : col.getCellsList()) { - Row cellRow = - Row.withSchema(CELL_SCHEMA) - .withFieldValue("value", ByteBuffer.wrap(cell.getValue().toByteArray())) - .withFieldValue("timestamp_micros", cell.getTimestampMicros()) + /** + * A {@link DoFn} that converts a Bigtable {@link com.google.bigtable.v2.Row} to a Beam {@link + * Row}. It supports both a nested representation and a flattened representation where each column + * becomes a separate output element. + */ + private static class BigtableRowConverterDoFn extends DoFn<com.google.bigtable.v2.Row, Row> { + private final BigtableReadSchemaTransformConfiguration configuration; + + BigtableRowConverterDoFn(BigtableReadSchemaTransformConfiguration configuration) { + this.configuration = configuration; + } + + @ProcessElement + public void processElement( + @Element com.google.bigtable.v2.Row bigtableRow, OutputReceiver<Row> out) { Review Comment:  There's some code duplication in the `if` and `else` blocks for converting a list of Bigtable `Cell`s to a list of Beam `Row`s. To improve maintainability and reduce redundancy, you could extract this logic into a private helper method. ```java private List<Row> convertCells(List<com.google.bigtable.v2.Cell> bigtableCells) { List<Row> beamCells = new ArrayList<>(); for (com.google.bigtable.v2.Cell cell : bigtableCells) { Row cellRow = Row.withSchema(CELL_SCHEMA) .withFieldValue("value", cell.getValue().toByteArray()) .withFieldValue("timestamp_micros", cell.getTimestampMicros()) .build(); beamCells.add(cellRow); } return beamCells; } ``` ########## sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProviderIT.java: ########## @@ -136,95 +136,197 @@ tableAdminClient.close(); } - public List<Row> writeToTable(int numRows) { + @Test + public void testRead() { + int numRows = 20; List<Row> expectedRows = new ArrayList<>(); + for (int i = 1; i <= numRows; i++) { + String key = "key" + i; + byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8); + String valueA = "value a" + i; + byte[] valueABytes = valueA.getBytes(StandardCharsets.UTF_8); + String valueB = "value b" + i; + byte[] valueBBytes = valueB.getBytes(StandardCharsets.UTF_8); + String valueC = "value c" + i; + byte[] valueCBytes = valueC.getBytes(StandardCharsets.UTF_8); + String valueD = "value d" + i; + byte[] valueDBytes = valueD.getBytes(StandardCharsets.UTF_8); + long timestamp = 1000L * i; - try { - for (int i = 1; i <= numRows; i++) { - String key = "key" + i; - String valueA = "value a" + i; - String valueB = "value b" + i; - String valueC = "value c" + i; - String valueD = "value d" + i; - long timestamp = 1000L * i; - - RowMutation rowMutation = - RowMutation.create(tableId, key) - .setCell(COLUMN_FAMILY_NAME_1, "a", timestamp, valueA) - .setCell(COLUMN_FAMILY_NAME_1, "b", timestamp, valueB) - .setCell(COLUMN_FAMILY_NAME_2, "c", timestamp, valueC) - .setCell(COLUMN_FAMILY_NAME_2, "d", timestamp, valueD); - dataClient.mutateRow(rowMutation); - - // Set up expected Beam Row - Map<String, List<Row>> columns1 = new HashMap<>(); - columns1.put( - "a", - Arrays.asList( - Row.withSchema(CELL_SCHEMA) - .withFieldValue( - "value", ByteBuffer.wrap(valueA.getBytes(StandardCharsets.UTF_8))) - .withFieldValue("timestamp_micros", timestamp) - .build())); - columns1.put( - "b", - Arrays.asList( - Row.withSchema(CELL_SCHEMA) - .withFieldValue( - "value", ByteBuffer.wrap(valueB.getBytes(StandardCharsets.UTF_8))) - .withFieldValue("timestamp_micros", timestamp) - .build())); - - Map<String, List<Row>> columns2 = new HashMap<>(); - columns2.put( - "c", - Arrays.asList( - Row.withSchema(CELL_SCHEMA) - .withFieldValue( - "value", ByteBuffer.wrap(valueC.getBytes(StandardCharsets.UTF_8))) - .withFieldValue("timestamp_micros", timestamp) - .build())); - columns2.put( - "d", - Arrays.asList( - Row.withSchema(CELL_SCHEMA) - .withFieldValue( - "value", ByteBuffer.wrap(valueD.getBytes(StandardCharsets.UTF_8))) - .withFieldValue("timestamp_micros", timestamp) - .build())); - - Map<String, Map<String, List<Row>>> families = new HashMap<>(); - families.put(COLUMN_FAMILY_NAME_1, columns1); - families.put(COLUMN_FAMILY_NAME_2, columns2); - - Row expectedRow = - Row.withSchema(ROW_SCHEMA) - .withFieldValue("key", ByteBuffer.wrap(key.getBytes(StandardCharsets.UTF_8))) - .withFieldValue("column_families", families) - .build(); - - expectedRows.add(expectedRow); - } - LOG.info("Finished writing {} rows to table {}", numRows, tableId); - } catch (NotFoundException e) { - throw new RuntimeException("Failed to write to table", e); + RowMutation rowMutation = + RowMutation.create(tableId, key) + .setCell(COLUMN_FAMILY_NAME_1, "a", timestamp, valueA) + .setCell(COLUMN_FAMILY_NAME_1, "b", timestamp, valueB) + .setCell(COLUMN_FAMILY_NAME_2, "c", timestamp, valueC) + .setCell(COLUMN_FAMILY_NAME_2, "d", timestamp, valueD); + dataClient.mutateRow(rowMutation); + + // Set up expected Beam Row + // FIX: Use byte[] instead of ByteBuffer + Map<String, List<Row>> columns1 = new HashMap<>(); + columns1.put( + "a", + Arrays.asList( + Row.withSchema(CELL_SCHEMA) + .withFieldValue("value", valueABytes) + .withFieldValue("timestamp_micros", timestamp) + .build())); + columns1.put( + "b", + Arrays.asList( + Row.withSchema(CELL_SCHEMA) + .withFieldValue("value", valueBBytes) + .withFieldValue("timestamp_micros", timestamp) + .build())); + + Map<String, List<Row>> columns2 = new HashMap<>(); + columns2.put( + "c", + Arrays.asList( + Row.withSchema(CELL_SCHEMA) + .withFieldValue("value", valueCBytes) + .withFieldValue("timestamp_micros", timestamp) + .build())); + columns2.put( + "d", + Arrays.asList( + Row.withSchema(CELL_SCHEMA) + .withFieldValue("value", valueDBytes) + .withFieldValue("timestamp_micros", timestamp) + .build())); + + Map<String, Map<String, List<Row>>> families = new HashMap<>(); + families.put(COLUMN_FAMILY_NAME_1, columns1); + families.put(COLUMN_FAMILY_NAME_2, columns2); + + Row expectedRow = + Row.withSchema(ROW_SCHEMA) + .withFieldValue("key", keyBytes) + .withFieldValue("column_families", families) + .build(); + + expectedRows.add(expectedRow); } - return expectedRows; + LOG.info("Finished writing {} rows to table {}", numRows, tableId); + + BigtableReadSchemaTransformConfiguration config = + BigtableReadSchemaTransformConfiguration.builder() + .setTableId(tableId) + .setInstanceId(instanceId) + .setProjectId(projectId) + .setFlatten(false) + .build(); + + SchemaTransform transform = new BigtableReadSchemaTransformProvider().from(config); + + PCollection<Row> rows = PCollectionRowTuple.empty(p).apply(transform).get("output"); + + LOG.info("This is the rows: " + rows); Review Comment:  This log statement appears to be for debugging purposes. It should be removed from the final test code to keep the test output clean. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org