ahmedabu98 commented on code in PR #35696:
URL: https://github.com/apache/beam/pull/35696#discussion_r2237533031


##########
sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDBWriteSchemaTransformProvider.java:
##########
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.mongodb;

Review Comment:
   this probably made it in this PR by mistake? same with the IT class?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProvider.java:
##########
@@ -69,6 +70,13 @@ public class BigtableReadSchemaTransformProvider
                   Schema.FieldType.STRING,
                   Schema.FieldType.array(Schema.FieldType.row(CELL_SCHEMA))))
           .build();
+  public static final Schema FLATTENED_ROW_SCHEMA =
+      Schema.builder()
+          .addByteArrayField("key")
+          .addStringField("column_family")

Review Comment:
   Let's make this name compatible with the Write side. Does this correspond to 
"family_name"?
   
https://github.com/apache/beam/blob/ef0486285dd025b4cd07ed9bfedf198e3e78571e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java#L192



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProvider.java:
##########
@@ -88,7 +96,7 @@ public List<String> outputCollectionNames() {
   /** Configuration for reading from Bigtable. */
   @DefaultSchema(AutoValueSchema.class)
   @AutoValue
-  public abstract static class BigtableReadSchemaTransformConfiguration {
+  public abstract static class BigtableReadSchemaTransformConfiguration 
implements Serializable {

Review Comment:
   We shouldn't need to `implement Serializable` here. Can we remove it? I 
think it might conflict with Beam's Schema conversion logic



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProvider.java:
##########
@@ -152,45 +165,123 @@ public PCollectionRowTuple expand(PCollectionRowTuple 
input) {
                       .withInstanceId(configuration.getInstanceId())
                       .withProjectId(configuration.getProjectId()));
 
+      Schema outputSchema =
+          Boolean.FALSE.equals(configuration.getFlatten()) ? ROW_SCHEMA : 
FLATTENED_ROW_SCHEMA;
+
       PCollection<Row> beamRows =
-          bigtableRows.apply(MapElements.via(new 
BigtableRowToBeamRow())).setRowSchema(ROW_SCHEMA);
+          bigtableRows
+              .apply("ConvertToBeamRows", ParDo.of(new 
BigtableRowConverterDoFn(configuration)))
+              .setRowSchema(outputSchema);
 
       return PCollectionRowTuple.of(OUTPUT_TAG, beamRows);
     }
   }
 
-  public static class BigtableRowToBeamRow extends 
SimpleFunction<com.google.bigtable.v2.Row, Row> {
-    @Override
-    public Row apply(com.google.bigtable.v2.Row bigtableRow) {
-      // The collection of families is represented as a Map of column families.
-      // Each column family is represented as a Map of columns.
-      // Each column is represented as a List of cells
-      // Each cell is represented as a Beam Row consisting of value and 
timestamp_micros
-      Map<String, Map<String, List<Row>>> families = new HashMap<>();
-
-      for (Family fam : bigtableRow.getFamiliesList()) {
-        // Map of column qualifier to list of cells
-        Map<String, List<Row>> columns = new HashMap<>();
-        for (Column col : fam.getColumnsList()) {
-          List<Row> cells = new ArrayList<>();
-          for (Cell cell : col.getCellsList()) {
-            Row cellRow =
-                Row.withSchema(CELL_SCHEMA)
-                    .withFieldValue("value", 
ByteBuffer.wrap(cell.getValue().toByteArray()))
-                    .withFieldValue("timestamp_micros", 
cell.getTimestampMicros())
+  // old logic for reference
+  //  public static class BigtableRowToBeamRow extends 
SimpleFunction<com.google.bigtable.v2.Row,
+  // Row> {
+  //    @Override
+  //    public Row apply(com.google.bigtable.v2.Row bigtableRow) {
+  //      // The collection of families is represented as a Map of column 
families.
+  //      // Each column family is represented as a Map of columns.
+  //      // Each column is represented as a List of cells
+  //      // Each cell is represented as a Beam Row consisting of value and 
timestamp_micros
+  //      Map<String, Map<String, List<Row>>> families = new HashMap<>();
+  //
+  //      for (Family fam : bigtableRow.getFamiliesList()) {
+  //        // Map of column qualifier to list of cells
+  //        Map<String, List<Row>> columns = new HashMap<>();
+  //        for (Column col : fam.getColumnsList()) {
+  //          List<Row> cells = new ArrayList<>();
+  //          for (Cell cell : col.getCellsList()) {
+  //            Row cellRow =
+  //                Row.withSchema(CELL_SCHEMA)
+  //                    .withFieldValue("value", 
ByteBuffer.wrap(cell.getValue().toByteArray()))
+  //                    .withFieldValue("timestamp_micros", 
cell.getTimestampMicros())
+  //                    .build();
+  //            cells.add(cellRow);
+  //          }
+  //          columns.put(col.getQualifier().toStringUtf8(), cells);
+  //        }
+  //        families.put(fam.getName(), columns);
+  //      }
+  //      Row beamRow =
+  //          Row.withSchema(ROW_SCHEMA)
+  //              .withFieldValue("key", 
ByteBuffer.wrap(bigtableRow.getKey().toByteArray()))
+  //              .withFieldValue("column_families", families)
+  //              .build();
+  //      return beamRow;
+  //    }
+  //  }

Review Comment:
   nit: cleanup commented code



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProvider.java:
##########
@@ -152,45 +165,123 @@ public PCollectionRowTuple expand(PCollectionRowTuple 
input) {
                       .withInstanceId(configuration.getInstanceId())
                       .withProjectId(configuration.getProjectId()));
 
+      Schema outputSchema =
+          Boolean.FALSE.equals(configuration.getFlatten()) ? ROW_SCHEMA : 
FLATTENED_ROW_SCHEMA;
+
       PCollection<Row> beamRows =
-          bigtableRows.apply(MapElements.via(new 
BigtableRowToBeamRow())).setRowSchema(ROW_SCHEMA);
+          bigtableRows
+              .apply("ConvertToBeamRows", ParDo.of(new 
BigtableRowConverterDoFn(configuration)))
+              .setRowSchema(outputSchema);
 
       return PCollectionRowTuple.of(OUTPUT_TAG, beamRows);
     }
   }
 
-  public static class BigtableRowToBeamRow extends 
SimpleFunction<com.google.bigtable.v2.Row, Row> {
-    @Override
-    public Row apply(com.google.bigtable.v2.Row bigtableRow) {
-      // The collection of families is represented as a Map of column families.
-      // Each column family is represented as a Map of columns.
-      // Each column is represented as a List of cells
-      // Each cell is represented as a Beam Row consisting of value and 
timestamp_micros
-      Map<String, Map<String, List<Row>>> families = new HashMap<>();
-
-      for (Family fam : bigtableRow.getFamiliesList()) {
-        // Map of column qualifier to list of cells
-        Map<String, List<Row>> columns = new HashMap<>();
-        for (Column col : fam.getColumnsList()) {
-          List<Row> cells = new ArrayList<>();
-          for (Cell cell : col.getCellsList()) {
-            Row cellRow =
-                Row.withSchema(CELL_SCHEMA)
-                    .withFieldValue("value", 
ByteBuffer.wrap(cell.getValue().toByteArray()))
-                    .withFieldValue("timestamp_micros", 
cell.getTimestampMicros())
+  // old logic for reference
+  //  public static class BigtableRowToBeamRow extends 
SimpleFunction<com.google.bigtable.v2.Row,
+  // Row> {
+  //    @Override
+  //    public Row apply(com.google.bigtable.v2.Row bigtableRow) {
+  //      // The collection of families is represented as a Map of column 
families.
+  //      // Each column family is represented as a Map of columns.
+  //      // Each column is represented as a List of cells
+  //      // Each cell is represented as a Beam Row consisting of value and 
timestamp_micros
+  //      Map<String, Map<String, List<Row>>> families = new HashMap<>();
+  //
+  //      for (Family fam : bigtableRow.getFamiliesList()) {
+  //        // Map of column qualifier to list of cells
+  //        Map<String, List<Row>> columns = new HashMap<>();
+  //        for (Column col : fam.getColumnsList()) {
+  //          List<Row> cells = new ArrayList<>();
+  //          for (Cell cell : col.getCellsList()) {
+  //            Row cellRow =
+  //                Row.withSchema(CELL_SCHEMA)
+  //                    .withFieldValue("value", 
ByteBuffer.wrap(cell.getValue().toByteArray()))
+  //                    .withFieldValue("timestamp_micros", 
cell.getTimestampMicros())
+  //                    .build();
+  //            cells.add(cellRow);
+  //          }
+  //          columns.put(col.getQualifier().toStringUtf8(), cells);
+  //        }
+  //        families.put(fam.getName(), columns);
+  //      }
+  //      Row beamRow =
+  //          Row.withSchema(ROW_SCHEMA)
+  //              .withFieldValue("key", 
ByteBuffer.wrap(bigtableRow.getKey().toByteArray()))
+  //              .withFieldValue("column_families", families)
+  //              .build();
+  //      return beamRow;
+  //    }
+  //  }
+  /**
+   * A {@link DoFn} that converts a Bigtable {@link 
com.google.bigtable.v2.Row} to a Beam {@link
+   * Row}. It supports both a nested representation and a flattened 
representation where each column
+   * becomes a separate output element.
+   */
+  private static class BigtableRowConverterDoFn extends 
DoFn<com.google.bigtable.v2.Row, Row> {
+    private final BigtableReadSchemaTransformConfiguration configuration;
+
+    BigtableRowConverterDoFn(BigtableReadSchemaTransformConfiguration 
configuration) {
+      this.configuration = configuration;
+    }
+
+    @ProcessElement
+    public void processElement(
+        @Element com.google.bigtable.v2.Row bigtableRow, OutputReceiver<Row> 
out) {
+      // The builder defaults flatten to true. We check for an explicit false 
setting to disable it.
+      if (Boolean.FALSE.equals(configuration.getFlatten())) {
+        // Non-flattening logic (original behavior): one output row per 
Bigtable row.
+        Map<String, Map<String, List<Row>>> families = new HashMap<>();
+        for (Family fam : bigtableRow.getFamiliesList()) {
+          Map<String, List<Row>> columns = new HashMap<>();
+          for (Column col : fam.getColumnsList()) {
+            List<Row> cells = new ArrayList<>();
+            for (Cell cell : col.getCellsList()) {
+              Row cellRow =
+                  Row.withSchema(CELL_SCHEMA)
+                      .withFieldValue("value", cell.getValue().toByteArray())
+                      .withFieldValue("timestamp_micros", 
cell.getTimestampMicros())
+                      .build();
+              cells.add(cellRow);
+            }
+            columns.put(col.getQualifier().toStringUtf8(), cells);
+          }
+          families.put(fam.getName(), columns);
+        }
+        Row beamRow =
+            Row.withSchema(ROW_SCHEMA)
+                .withFieldValue("key", bigtableRow.getKey().toByteArray())
+                .withFieldValue("column_families", families)
+                .build();
+        out.output(beamRow);
+      } else {
+        // Flattening logic (new behavior): one output row per column 
qualifier.
+        byte[] key = bigtableRow.getKey().toByteArray();
+        for (Family fam : bigtableRow.getFamiliesList()) {
+          String familyName = fam.getName();
+          for (Column col : fam.getColumnsList()) {
+            String qualifierName = col.getQualifier().toStringUtf8();
+            List<Row> cells = new ArrayList<>();
+            for (Cell cell : col.getCellsList()) {
+              Row cellRow =
+                  Row.withSchema(CELL_SCHEMA)
+                      .withFieldValue("value", cell.getValue().toByteArray())
+                      .withFieldValue("timestamp_micros", 
cell.getTimestampMicros())
+                      .build();
+              cells.add(cellRow);
+            }
+
+            Row flattenedRow =
+                Row.withSchema(FLATTENED_ROW_SCHEMA)
+                    .withFieldValue("key", key)
+                    .withFieldValue("column_family,", familyName)

Review Comment:
   typo
   ```suggestion
                       .withFieldValue("column_family", familyName)
   ```



##########
sdks/python/apache_beam/yaml/tests/bigtable.yaml:
##########
@@ -85,3 +85,52 @@ pipelines:
             project: 'apache-beam-testing'
             instance: "{BT_INSTANCE}"
             table: 'test-table'
+  - pipeline:
+      type: chain
+      transforms:
+        - type: ReadFromBigTable
+          config:
+            project: 'apache-beam-testing'
+            instance: "{BT_INSTANCE}"
+            table: 'test-table'
+            flatten: True
+        - type: MapToFields
+          config:
+            language: python
+            fields:
+              key:
+                callable: |
+                  def convert_to_bytes(row):
+                    return row.key.decode("utf-8") if "key" in row._fields 
else None
+        - type: AssertEqual
+          config:
+            elements:
+              - {'key': 'row1'}
+              - {'key': 'row1' }

Review Comment:
   Can we validate the rest of the columns too?



##########
sdks/python/apache_beam/yaml/tests/bigtable.yaml:
##########
@@ -85,3 +85,52 @@ pipelines:
             project: 'apache-beam-testing'
             instance: "{BT_INSTANCE}"
             table: 'test-table'
+  - pipeline:
+      type: chain
+      transforms:
+        - type: ReadFromBigTable
+          config:
+            project: 'apache-beam-testing'
+            instance: "{BT_INSTANCE}"
+            table: 'test-table'
+            flatten: True

Review Comment:
   Let's remove this line to make sure we get flattened rows by default



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadSchemaTransformProviderIT.java:
##########
@@ -136,95 +136,197 @@ public void tearDown() {
     tableAdminClient.close();
   }
 
-  public List<Row> writeToTable(int numRows) {
+  @Test
+  public void testRead() {
+    int numRows = 20;
     List<Row> expectedRows = new ArrayList<>();
+    for (int i = 1; i <= numRows; i++) {
+      String key = "key" + i;
+      byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
+      String valueA = "value a" + i;
+      byte[] valueABytes = valueA.getBytes(StandardCharsets.UTF_8);
+      String valueB = "value b" + i;
+      byte[] valueBBytes = valueB.getBytes(StandardCharsets.UTF_8);
+      String valueC = "value c" + i;
+      byte[] valueCBytes = valueC.getBytes(StandardCharsets.UTF_8);
+      String valueD = "value d" + i;
+      byte[] valueDBytes = valueD.getBytes(StandardCharsets.UTF_8);
+      long timestamp = 1000L * i;
 
-    try {
-      for (int i = 1; i <= numRows; i++) {
-        String key = "key" + i;
-        String valueA = "value a" + i;
-        String valueB = "value b" + i;
-        String valueC = "value c" + i;
-        String valueD = "value d" + i;
-        long timestamp = 1000L * i;
-
-        RowMutation rowMutation =
-            RowMutation.create(tableId, key)
-                .setCell(COLUMN_FAMILY_NAME_1, "a", timestamp, valueA)
-                .setCell(COLUMN_FAMILY_NAME_1, "b", timestamp, valueB)
-                .setCell(COLUMN_FAMILY_NAME_2, "c", timestamp, valueC)
-                .setCell(COLUMN_FAMILY_NAME_2, "d", timestamp, valueD);
-        dataClient.mutateRow(rowMutation);
-
-        // Set up expected Beam Row
-        Map<String, List<Row>> columns1 = new HashMap<>();
-        columns1.put(
-            "a",
-            Arrays.asList(
-                Row.withSchema(CELL_SCHEMA)
-                    .withFieldValue(
-                        "value", 
ByteBuffer.wrap(valueA.getBytes(StandardCharsets.UTF_8)))
-                    .withFieldValue("timestamp_micros", timestamp)
-                    .build()));
-        columns1.put(
-            "b",
-            Arrays.asList(
-                Row.withSchema(CELL_SCHEMA)
-                    .withFieldValue(
-                        "value", 
ByteBuffer.wrap(valueB.getBytes(StandardCharsets.UTF_8)))
-                    .withFieldValue("timestamp_micros", timestamp)
-                    .build()));
-
-        Map<String, List<Row>> columns2 = new HashMap<>();
-        columns2.put(
-            "c",
-            Arrays.asList(
-                Row.withSchema(CELL_SCHEMA)
-                    .withFieldValue(
-                        "value", 
ByteBuffer.wrap(valueC.getBytes(StandardCharsets.UTF_8)))
-                    .withFieldValue("timestamp_micros", timestamp)
-                    .build()));
-        columns2.put(
-            "d",
-            Arrays.asList(
-                Row.withSchema(CELL_SCHEMA)
-                    .withFieldValue(
-                        "value", 
ByteBuffer.wrap(valueD.getBytes(StandardCharsets.UTF_8)))
-                    .withFieldValue("timestamp_micros", timestamp)
-                    .build()));
-
-        Map<String, Map<String, List<Row>>> families = new HashMap<>();
-        families.put(COLUMN_FAMILY_NAME_1, columns1);
-        families.put(COLUMN_FAMILY_NAME_2, columns2);
-
-        Row expectedRow =
-            Row.withSchema(ROW_SCHEMA)
-                .withFieldValue("key", 
ByteBuffer.wrap(key.getBytes(StandardCharsets.UTF_8)))
-                .withFieldValue("column_families", families)
-                .build();
-
-        expectedRows.add(expectedRow);
-      }
-      LOG.info("Finished writing {} rows to table {}", numRows, tableId);
-    } catch (NotFoundException e) {
-      throw new RuntimeException("Failed to write to table", e);
+      RowMutation rowMutation =
+          RowMutation.create(tableId, key)
+              .setCell(COLUMN_FAMILY_NAME_1, "a", timestamp, valueA)
+              .setCell(COLUMN_FAMILY_NAME_1, "b", timestamp, valueB)
+              .setCell(COLUMN_FAMILY_NAME_2, "c", timestamp, valueC)
+              .setCell(COLUMN_FAMILY_NAME_2, "d", timestamp, valueD);
+      dataClient.mutateRow(rowMutation);
+
+      // Set up expected Beam Row
+      // FIX: Use byte[] instead of ByteBuffer
+      Map<String, List<Row>> columns1 = new HashMap<>();
+      columns1.put(
+          "a",
+          Arrays.asList(
+              Row.withSchema(CELL_SCHEMA)
+                  .withFieldValue("value", valueABytes)
+                  .withFieldValue("timestamp_micros", timestamp)
+                  .build()));
+      columns1.put(
+          "b",
+          Arrays.asList(
+              Row.withSchema(CELL_SCHEMA)
+                  .withFieldValue("value", valueBBytes)
+                  .withFieldValue("timestamp_micros", timestamp)
+                  .build()));
+
+      Map<String, List<Row>> columns2 = new HashMap<>();
+      columns2.put(
+          "c",
+          Arrays.asList(
+              Row.withSchema(CELL_SCHEMA)
+                  .withFieldValue("value", valueCBytes)
+                  .withFieldValue("timestamp_micros", timestamp)
+                  .build()));
+      columns2.put(
+          "d",
+          Arrays.asList(
+              Row.withSchema(CELL_SCHEMA)
+                  .withFieldValue("value", valueDBytes)
+                  .withFieldValue("timestamp_micros", timestamp)
+                  .build()));
+
+      Map<String, Map<String, List<Row>>> families = new HashMap<>();
+      families.put(COLUMN_FAMILY_NAME_1, columns1);
+      families.put(COLUMN_FAMILY_NAME_2, columns2);
+
+      Row expectedRow =
+          Row.withSchema(ROW_SCHEMA)
+              .withFieldValue("key", keyBytes)
+              .withFieldValue("column_families", families)
+              .build();
+
+      expectedRows.add(expectedRow);
     }
-    return expectedRows;
+    LOG.info("Finished writing {} rows to table {}", numRows, tableId);
+
+    BigtableReadSchemaTransformConfiguration config =
+        BigtableReadSchemaTransformConfiguration.builder()
+            .setTableId(tableId)
+            .setInstanceId(instanceId)
+            .setProjectId(projectId)
+            .setFlatten(false)
+            .build();
+
+    SchemaTransform transform = new 
BigtableReadSchemaTransformProvider().from(config);
+
+    PCollection<Row> rows = 
PCollectionRowTuple.empty(p).apply(transform).get("output");
+
+    LOG.info("This is the rows: " + rows);

Review Comment:
   cleanup?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to