Polber commented on code in PR #31987:
URL: https://github.com/apache/beam/pull/31987#discussion_r1691868652


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadSchemaTransformProvider.java:
##########
@@ -0,0 +1,179 @@
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.checkerframework.checker.initialization.qual.Initialized;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+
+import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
+import org.apache.beam.sdk.io.gcp.spanner.StructUtils;
+import org.apache.beam.sdk.io.gcp.spanner.SpannerIO.Read;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+import com.google.cloud.spanner.Struct;
+import com.google.cloud.spanner.Type;
+import javax.annotation.Nullable;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@AutoService(SchemaTransformProvider.class)
+public class SpannerReadSchemaTransformProvider
+    extends TypedSchemaTransformProvider<
+        
SpannerReadSchemaTransformProvider.SpannerReadSchemaTransformConfiguration> {
+
+  static class SpannerSchemaTransformRead extends SchemaTransform implements 
Serializable {
+    private final SpannerReadSchemaTransformConfiguration configuration;
+
+    SpannerSchemaTransformRead(SpannerReadSchemaTransformConfiguration 
configuration) {
+      configuration.validate();
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      checkNotNull(input, "Input to SpannerReadSchemaTransform cannot be 
null.");
+      PCollection<Struct> spannerRows = null; 
+
+      if (!Strings.isNullOrEmpty(configuration.getQuery())) {
+          spannerRows = input.getPipeline().apply(
+            SpannerIO.readWithSchema()
+            .withProjectId(configuration.getProjectId())
+            .withInstanceId(configuration.getInstanceId())
+            .withDatabaseId(configuration.getDatabaseId())
+            .withQuery(configuration.getQuery())
+            );
+      } 
+      else {
+        spannerRows = input.getPipeline().apply(
+          SpannerIO.readWithSchema()
+          .withProjectId(configuration.getProjectId())
+          .withInstanceId(configuration.getInstanceId())
+          .withDatabaseId(configuration.getDatabaseId())
+          .withTable(configuration.getTableId())
+          .withColumns(configuration.getColumns())
+          );
+      }

Review Comment:
   the `Configuration.validate()` method already does checks to ensure correct 
combo of parameters are specified, so you can just chain the specified ones 
together and reduce code-duplication.
   
   i.e.
   ```
   SpannerIO.Read read = SpannerIO.readWithSchema();
   
   if (!Strings.isNullOrEmpty(configuration.getProjectId())) {
     read = read.withProjectId(configuration.getProjectId());
   }
   if (!Strings.isNullOrEmpty(configuration.getInstanceId()) {
     read = read.withInstanceId(configuration.getInstanceId())
   }
   
   ...
   
   PCollection<Struct> spannerRows = input.getPipeline().apply(read);
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadSchemaTransformProvider.java:
##########
@@ -0,0 +1,179 @@
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.checkerframework.checker.initialization.qual.Initialized;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+
+import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
+import org.apache.beam.sdk.io.gcp.spanner.StructUtils;
+import org.apache.beam.sdk.io.gcp.spanner.SpannerIO.Read;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+import com.google.cloud.spanner.Struct;
+import com.google.cloud.spanner.Type;
+import javax.annotation.Nullable;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@AutoService(SchemaTransformProvider.class)
+public class SpannerReadSchemaTransformProvider
+    extends TypedSchemaTransformProvider<
+        
SpannerReadSchemaTransformProvider.SpannerReadSchemaTransformConfiguration> {
+
+  static class SpannerSchemaTransformRead extends SchemaTransform implements 
Serializable {
+    private final SpannerReadSchemaTransformConfiguration configuration;
+
+    SpannerSchemaTransformRead(SpannerReadSchemaTransformConfiguration 
configuration) {
+      configuration.validate();
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      checkNotNull(input, "Input to SpannerReadSchemaTransform cannot be 
null.");
+      PCollection<Struct> spannerRows = null; 
+
+      if (!Strings.isNullOrEmpty(configuration.getQuery())) {
+          spannerRows = input.getPipeline().apply(
+            SpannerIO.readWithSchema()
+            .withProjectId(configuration.getProjectId())
+            .withInstanceId(configuration.getInstanceId())
+            .withDatabaseId(configuration.getDatabaseId())
+            .withQuery(configuration.getQuery())
+            );
+      } 
+      else {
+        spannerRows = input.getPipeline().apply(
+          SpannerIO.readWithSchema()
+          .withProjectId(configuration.getProjectId())
+          .withInstanceId(configuration.getInstanceId())
+          .withDatabaseId(configuration.getDatabaseId())
+          .withTable(configuration.getTableId())
+          .withColumns(configuration.getColumns())
+          );
+      }
+
+      // Hardcoded for testing
+      /*
+      Schema schema = Schema.builder()
+            .addField("id_column", Schema.FieldType.INT64)
+            .addField("name_column", Schema.FieldType.STRING)
+            .build();
+      */
+      // Implement when getSchema() is available

Review Comment:
   Please remove
   ```suggestion
   ```



##########
sdks/python/apache_beam/yaml/standard_io.yaml:
##########
@@ -257,3 +257,29 @@
         'WriteToJdbc': 'beam:schematransform:org.apache.beam:jdbc_write:v1'
       config:
         gradle_target: 
'sdks:java:extensions:schemaio-expansion-service:shadowJar'
+
+- type: renaming
+  transforms:
+    'ReadFromSpanner': 'ReadFromSpanner'
+    'WriteToSpanner': 'WriteToSpanner'
+  config:
+    mappings:
+      'ReadFromSpanner':
+        project_id: 'project_id'
+        instance_id: 'instance_id'
+        database_id: 'database_id'
+        table_id: 'table_id'
+        query: 'query'
+        columns: 'columns'
+      'WriteToSpanner':
+        instance_id: 'instance_id'
+        database_id: 'database_id'
+        table_id: 'table_id'

Review Comment:
   I would strip the `_id` from all of these parameters on the YAML side
   
   i.e.
   ```suggestion
           project: 'project_id'
           instance: 'instance_id'
           database: 'database_id'
           table: 'table_id'
           query: 'query'
           columns: 'columns'
         'WriteToSpanner':
           instance: 'instance_id'
           database: 'database_id'
           table: 'table_id'
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java:
##########
@@ -167,36 +198,77 @@ public PCollectionRowTuple expand(@NonNull 
PCollectionRowTuple input) {
   @Override
   public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull 
@Initialized String>
       outputCollectionNames() {
-    return Arrays.asList("failures", "errors");
+    return Collections.singletonList("post-write");
   }
 
   @AutoValue
   @DefaultSchema(AutoValueSchema.class)
   public abstract static class SpannerWriteSchemaTransformConfiguration 
implements Serializable {
 
+    @SchemaFieldDescription("Specifies the GCP project.")
+    @Nullable
+    public abstract String getProjectId();
+
     @SchemaFieldDescription("Specifies the Cloud Spanner instance.")
+    @Nullable
     public abstract String getInstanceId();
 
     @SchemaFieldDescription("Specifies the Cloud Spanner database.")
+    @Nullable
     public abstract String getDatabaseId();
 
     @SchemaFieldDescription("Specifies the Cloud Spanner table.")
+    @Nullable
     public abstract String getTableId();

Review Comment:
   I would remove `@Nullable` tag if these are all required. I'm not sure if 
project is required, but maybe check on that one too.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java:
##########
@@ -112,44 +139,48 @@ public PCollectionRowTuple expand(@NonNull 
PCollectionRowTuple input) {
                                   Objects.requireNonNull(row))) {}))
               .apply(
                   SpannerIO.write()
+                      .withProjectId(configuration.getProjectId())
                       .withDatabaseId(configuration.getDatabaseId())
                       .withInstanceId(configuration.getInstanceId())
-                      .withFailureMode(SpannerIO.FailureMode.REPORT_FAILURES));
+                      .withFailureMode(failureMode));

Review Comment:
   nit: can inline from above
   ```suggestion
                         .withFailureMode(handleErrors ? 
FailureMode.REPORT_FAILURES : FailureMode.FAIL_FAST));
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtils.java:
##########
@@ -351,4 +355,63 @@ private static void addIterableToMutationBuilder(
                 beamIterableType.getTypeName()));
     }
   }
+  public static Row createRowFromMutation(Schema schema, Mutation mutation) {
+    Row.Builder rowBuilder = Row.withSchema(schema);
+    Iterable<Value> values = mutation.getValues();
+    Iterator<Value> valuesItr = values.iterator();
+    
+    while (valuesItr.hasNext()) {
+      Value value = valuesItr.next();
+      rowBuilder.addValue(convertValueToBeamFieldType(value));
+    }

Review Comment:
   I'm worried the ordering of the mutation fields iterator isn't guaranteed to 
be the same as the initial row that was written.
   
   `Mutation` has a built-in method `asMap()` that returns a map `{columnName: 
value}` that you could loop over and use `rowBuilder.withFieldValue(column, 
value)` instead
   
   Something like
   ```
   mutation.asMap().forEach(
           (column, value) -> rowBuilder.withFieldValue(column, 
getMutationValue(value)));
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java:
##########
@@ -112,44 +139,48 @@ public PCollectionRowTuple expand(@NonNull 
PCollectionRowTuple input) {
                                   Objects.requireNonNull(row))) {}))
               .apply(
                   SpannerIO.write()
+                      .withProjectId(configuration.getProjectId())
                       .withDatabaseId(configuration.getDatabaseId())
                       .withInstanceId(configuration.getInstanceId())
-                      .withFailureMode(SpannerIO.FailureMode.REPORT_FAILURES));
+                      .withFailureMode(failureMode));
+
+                      PCollection<Row> postWrite =
+                      result
+                          .getFailedMutations()
+                          .apply("post-write", ParDo.of(new 
NoOutputDoFn<MutationGroup>()))
+                          .setRowSchema(Schema.of());
+              
+      if (!handleErrors) 
+          return PCollectionRowTuple.of("post-write", postWrite);
+          
+      Schema inputSchema = input.get("input").getSchema();
       Schema failureSchema =
-          Schema.builder()
-              .addStringField("operation")
-              .addStringField("instanceId")
-              .addStringField("databaseId")
-              .addStringField("tableId")
-              .addStringField("mutationData")
-              .build();
+          Schema.of(
+            Field.of("error_message", FieldType.STRING),
+            Field.of("failed_row", FieldType.row(inputSchema)));

Review Comment:
   ```suggestion
             ErrorHandling.errorSchema(inputSchema);
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadSchemaTransformProvider.java:
##########
@@ -0,0 +1,179 @@
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.checkerframework.checker.initialization.qual.Initialized;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+
+import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
+import org.apache.beam.sdk.io.gcp.spanner.StructUtils;
+import org.apache.beam.sdk.io.gcp.spanner.SpannerIO.Read;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+import com.google.cloud.spanner.Struct;
+import com.google.cloud.spanner.Type;
+import javax.annotation.Nullable;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@AutoService(SchemaTransformProvider.class)
+public class SpannerReadSchemaTransformProvider
+    extends TypedSchemaTransformProvider<
+        
SpannerReadSchemaTransformProvider.SpannerReadSchemaTransformConfiguration> {
+
+  static class SpannerSchemaTransformRead extends SchemaTransform implements 
Serializable {
+    private final SpannerReadSchemaTransformConfiguration configuration;
+
+    SpannerSchemaTransformRead(SpannerReadSchemaTransformConfiguration 
configuration) {
+      configuration.validate();
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      checkNotNull(input, "Input to SpannerReadSchemaTransform cannot be 
null.");
+      PCollection<Struct> spannerRows = null; 
+
+      if (!Strings.isNullOrEmpty(configuration.getQuery())) {
+          spannerRows = input.getPipeline().apply(
+            SpannerIO.readWithSchema()
+            .withProjectId(configuration.getProjectId())
+            .withInstanceId(configuration.getInstanceId())
+            .withDatabaseId(configuration.getDatabaseId())
+            .withQuery(configuration.getQuery())
+            );
+      } 
+      else {
+        spannerRows = input.getPipeline().apply(
+          SpannerIO.readWithSchema()
+          .withProjectId(configuration.getProjectId())
+          .withInstanceId(configuration.getInstanceId())
+          .withDatabaseId(configuration.getDatabaseId())
+          .withTable(configuration.getTableId())
+          .withColumns(configuration.getColumns())
+          );
+      }
+
+      // Hardcoded for testing
+      /*
+      Schema schema = Schema.builder()
+            .addField("id_column", Schema.FieldType.INT64)
+            .addField("name_column", Schema.FieldType.STRING)
+            .build();
+      */
+      // Implement when getSchema() is available
+      Schema schema = spannerRows.getSchema();
+      PCollection<Row> rows = 
spannerRows.apply(MapElements.into(TypeDescriptor.of(Row.class))
+          .via((Struct struct) -> StructUtils.structToBeamRow(struct, 
schema)));
+
+          return PCollectionRowTuple.of("output", rows.setRowSchema(schema));
+    }
+  }
+
+  @Override
+  public @UnknownKeyFor @NonNull @Initialized String identifier() {
+    return "beam:schematransform:org.apache.beam:spanner_read:v1";
+  }
+
+  @Override
+  public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull 
@Initialized String>
+      inputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull 
@Initialized String>
+      outputCollectionNames() {
+    return Collections.singletonList("output");
+  }
+
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  public abstract static class SpannerReadSchemaTransformConfiguration 
implements Serializable {
+    @AutoValue.Builder
+    @Nullable
+    public abstract static class Builder {
+      public abstract Builder setProjectId(String projectId);
+      public abstract Builder setInstanceId(String instanceId);
+      public abstract Builder setDatabaseId(String databaseId);
+      public abstract Builder setTableId(String tableId);
+      public abstract Builder setQuery(String query);
+      public abstract Builder setColumns(List<String> columns);
+      public abstract SpannerReadSchemaTransformConfiguration build();
+    }
+
+    public void validate() {
+      String invalidConfigMessage = "Invalid Cloud Spanner Read configuration: 
";
+      if (!Strings.isNullOrEmpty(this.getQuery())) {
+        checkNotNull(this.getProjectId(), invalidConfigMessage + "Project ID 
must be specified for SQL query.");
+        checkNotNull(this.getInstanceId(), invalidConfigMessage + "Instance ID 
must be specified for SQL query.");
+        checkNotNull(this.getDatabaseId(), invalidConfigMessage + "Database ID 
must be specified for SQL query.");
+      } 
+      else {
+        checkNotNull(this.getProjectId(), invalidConfigMessage + "Project ID 
must be specified for table read.");
+        checkNotNull(this.getTableId(), invalidConfigMessage + "Table name 
must be specified for table read.");
+        checkNotNull(this.getInstanceId(), invalidConfigMessage + "Instance ID 
must be specified for table read.");
+        checkNotNull(this.getDatabaseId(), invalidConfigMessage + "Database ID 
must be specified for table read.");
+        checkNotNull(this.getColumns(), invalidConfigMessage + "Columns must 
be specified for table read.");
+      }

Review Comment:
   This logic can be simplified. For example, several of the checks are 
duplicated.
   
   Also add check for mutual exclusivity between `query` and `tableId`



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java:
##########
@@ -167,36 +198,77 @@ public PCollectionRowTuple expand(@NonNull 
PCollectionRowTuple input) {
   @Override
   public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull 
@Initialized String>
       outputCollectionNames() {
-    return Arrays.asList("failures", "errors");
+    return Collections.singletonList("post-write");
   }
 
   @AutoValue
   @DefaultSchema(AutoValueSchema.class)
   public abstract static class SpannerWriteSchemaTransformConfiguration 
implements Serializable {
 
+    @SchemaFieldDescription("Specifies the GCP project.")
+    @Nullable
+    public abstract String getProjectId();
+
     @SchemaFieldDescription("Specifies the Cloud Spanner instance.")
+    @Nullable
     public abstract String getInstanceId();
 
     @SchemaFieldDescription("Specifies the Cloud Spanner database.")
+    @Nullable
     public abstract String getDatabaseId();
 
     @SchemaFieldDescription("Specifies the Cloud Spanner table.")
+    @Nullable
     public abstract String getTableId();
 
+    @SchemaFieldDescription("Specifies how to handle errors.")
+    @Nullable
+    public abstract ErrorHandling getErrorHandling();
+
     public static Builder builder() {
       return new 
AutoValue_SpannerWriteSchemaTransformProvider_SpannerWriteSchemaTransformConfiguration
           .Builder();
     }
 
     @AutoValue.Builder
     public abstract static class Builder {
+      public abstract Builder setProjectId(String projectId);
+
       public abstract Builder setInstanceId(String instanceId);
 
       public abstract Builder setDatabaseId(String databaseId);
 
       public abstract Builder setTableId(String tableId);
 
+      public abstract Builder setErrorHandling(ErrorHandling errorHandling);
+
       public abstract SpannerWriteSchemaTransformConfiguration build();
     }
+
+    public void validate() {
+      String invalidConfigMessage = "Invalid Spanner Write configuration: ";
+
+      checkArgument(
+        !Strings.isNullOrEmpty(this.getProjectId()),
+        invalidConfigMessage + "Project ID for a Spanner Write must be 
specified.");
+
+      checkArgument(
+        !Strings.isNullOrEmpty(this.getInstanceId()),
+        invalidConfigMessage + "Instance ID for a Spanner Write must be 
specified.");
+      
+      checkArgument(
+        !Strings.isNullOrEmpty(this.getDatabaseId()),
+        invalidConfigMessage + "Database ID for a Spanner Write must be 
specified.");
+
+      checkArgument(
+        !Strings.isNullOrEmpty(this.getTableId()),
+        invalidConfigMessage + "Table ID for a Spanner Write must be 
specified.");
+
+      if (this.getErrorHandling() != null) {
+        checkArgument(
+            !Strings.isNullOrEmpty(this.getErrorHandling().getOutput()),
+            invalidConfigMessage + "Output must not be empty if error handling 
specified.");
+      }

Review Comment:
   I would remove this check. We already check for error output in the 
transform itself, and the YAML side checks that the error_handling tag has 
output



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java:
##########
@@ -112,44 +139,48 @@ public PCollectionRowTuple expand(@NonNull 
PCollectionRowTuple input) {
                                   Objects.requireNonNull(row))) {}))
               .apply(
                   SpannerIO.write()
+                      .withProjectId(configuration.getProjectId())
                       .withDatabaseId(configuration.getDatabaseId())
                       .withInstanceId(configuration.getInstanceId())
-                      .withFailureMode(SpannerIO.FailureMode.REPORT_FAILURES));
+                      .withFailureMode(failureMode));
+
+                      PCollection<Row> postWrite =
+                      result
+                          .getFailedMutations()
+                          .apply("post-write", ParDo.of(new 
NoOutputDoFn<MutationGroup>()))
+                          .setRowSchema(Schema.of());
+              
+      if (!handleErrors) 
+          return PCollectionRowTuple.of("post-write", postWrite);
+          
+      Schema inputSchema = input.get("input").getSchema();
       Schema failureSchema =
-          Schema.builder()
-              .addStringField("operation")
-              .addStringField("instanceId")
-              .addStringField("databaseId")
-              .addStringField("tableId")
-              .addStringField("mutationData")
-              .build();
+          Schema.of(
+            Field.of("error_message", FieldType.STRING),
+            Field.of("failed_row", FieldType.row(inputSchema)));
+
       PCollection<Row> failures =
           result
               .getFailedMutations()
               .apply(
                   FlatMapElements.into(TypeDescriptors.rows())
                       .via(
                           mtg ->
-                              Objects.requireNonNull(mtg).attached().stream()
+                              
StreamSupport.stream(Objects.requireNonNull(mtg).spliterator(), false)
                                   .map(
                                       mutation ->
                                           Row.withSchema(failureSchema)
-                                              
.addValue(mutation.getOperation().toString())
-                                              
.addValue(configuration.getInstanceId())
-                                              
.addValue(configuration.getDatabaseId())
-                                              .addValue(mutation.getTable())
-                                              // TODO(pabloem): Figure out how 
to represent
-                                              // mutation
-                                              //  contents in DLQ
-                                              .addValue(
-                                                  Iterators.toString(
-                                                      
mutation.getValues().iterator()))
-                                              .build())
-                                  .collect(Collectors.toList())))
+                                            .withFieldValue("error_message", 
String.format("%s operation failed at instance: %s, database: %s, table: %s",
+                                              
mutation.getOperation().toString(), configuration.getInstanceId(), 
configuration.getDatabaseId(), mutation.getTable()))

Review Comment:
   nit: should be implicit
   ```suggestion
                                                 mutation.getOperation(), 
configuration.getInstanceId(), configuration.getDatabaseId(), 
mutation.getTable()))
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadSchemaTransformProvider.java:
##########
@@ -0,0 +1,179 @@
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.checkerframework.checker.initialization.qual.Initialized;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+
+import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
+import org.apache.beam.sdk.io.gcp.spanner.StructUtils;
+import org.apache.beam.sdk.io.gcp.spanner.SpannerIO.Read;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+import com.google.cloud.spanner.Struct;
+import com.google.cloud.spanner.Type;
+import javax.annotation.Nullable;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@AutoService(SchemaTransformProvider.class)
+public class SpannerReadSchemaTransformProvider
+    extends TypedSchemaTransformProvider<
+        
SpannerReadSchemaTransformProvider.SpannerReadSchemaTransformConfiguration> {
+
+  static class SpannerSchemaTransformRead extends SchemaTransform implements 
Serializable {
+    private final SpannerReadSchemaTransformConfiguration configuration;
+
+    SpannerSchemaTransformRead(SpannerReadSchemaTransformConfiguration 
configuration) {
+      configuration.validate();
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      checkNotNull(input, "Input to SpannerReadSchemaTransform cannot be 
null.");
+      PCollection<Struct> spannerRows = null; 
+
+      if (!Strings.isNullOrEmpty(configuration.getQuery())) {
+          spannerRows = input.getPipeline().apply(
+            SpannerIO.readWithSchema()
+            .withProjectId(configuration.getProjectId())
+            .withInstanceId(configuration.getInstanceId())
+            .withDatabaseId(configuration.getDatabaseId())
+            .withQuery(configuration.getQuery())
+            );
+      } 
+      else {
+        spannerRows = input.getPipeline().apply(
+          SpannerIO.readWithSchema()
+          .withProjectId(configuration.getProjectId())
+          .withInstanceId(configuration.getInstanceId())
+          .withDatabaseId(configuration.getDatabaseId())
+          .withTable(configuration.getTableId())
+          .withColumns(configuration.getColumns())
+          );
+      }
+
+      // Hardcoded for testing
+      /*
+      Schema schema = Schema.builder()
+            .addField("id_column", Schema.FieldType.INT64)
+            .addField("name_column", Schema.FieldType.STRING)
+            .build();
+      */
+      // Implement when getSchema() is available
+      Schema schema = spannerRows.getSchema();
+      PCollection<Row> rows = 
spannerRows.apply(MapElements.into(TypeDescriptor.of(Row.class))
+          .via((Struct struct) -> StructUtils.structToBeamRow(struct, 
schema)));
+
+          return PCollectionRowTuple.of("output", rows.setRowSchema(schema));
+    }
+  }
+
+  @Override
+  public @UnknownKeyFor @NonNull @Initialized String identifier() {
+    return "beam:schematransform:org.apache.beam:spanner_read:v1";
+  }
+
+  @Override
+  public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull 
@Initialized String>
+      inputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull 
@Initialized String>
+      outputCollectionNames() {
+    return Collections.singletonList("output");
+  }
+
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  public abstract static class SpannerReadSchemaTransformConfiguration 
implements Serializable {
+    @AutoValue.Builder
+    @Nullable
+    public abstract static class Builder {
+      public abstract Builder setProjectId(String projectId);
+      public abstract Builder setInstanceId(String instanceId);
+      public abstract Builder setDatabaseId(String databaseId);
+      public abstract Builder setTableId(String tableId);
+      public abstract Builder setQuery(String query);
+      public abstract Builder setColumns(List<String> columns);
+      public abstract SpannerReadSchemaTransformConfiguration build();
+    }
+
+    public void validate() {
+      String invalidConfigMessage = "Invalid Cloud Spanner Read configuration: 
";
+      if (!Strings.isNullOrEmpty(this.getQuery())) {
+        checkNotNull(this.getProjectId(), invalidConfigMessage + "Project ID 
must be specified for SQL query.");
+        checkNotNull(this.getInstanceId(), invalidConfigMessage + "Instance ID 
must be specified for SQL query.");
+        checkNotNull(this.getDatabaseId(), invalidConfigMessage + "Database ID 
must be specified for SQL query.");
+      } 
+      else {
+        checkNotNull(this.getProjectId(), invalidConfigMessage + "Project ID 
must be specified for table read.");
+        checkNotNull(this.getTableId(), invalidConfigMessage + "Table name 
must be specified for table read.");
+        checkNotNull(this.getInstanceId(), invalidConfigMessage + "Instance ID 
must be specified for table read.");
+        checkNotNull(this.getDatabaseId(), invalidConfigMessage + "Database ID 
must be specified for table read.");
+        checkNotNull(this.getColumns(), invalidConfigMessage + "Columns must 
be specified for table read.");
+      }
+    }
+
+    public static Builder builder() {
+      return new 
AutoValue_SpannerReadSchemaTransformProvider_SpannerReadSchemaTransformConfiguration
+          .Builder();
+    }
+    @SchemaFieldDescription("Specifies the GCP project ID.")
+    @Nullable
+    public abstract String getProjectId();
+
+    @SchemaFieldDescription("Specifies the Cloud Spanner instance.")
+    @Nullable
+    public abstract String getInstanceId();
+
+    @SchemaFieldDescription("Specifies the Cloud Spanner database.")
+    @Nullable
+    public abstract String getDatabaseId();

Review Comment:
   I think both of these are required in all cases, so maybe remove the 
`@Nullable` tags and any others that are required if I missed any



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java:
##########
@@ -167,36 +198,77 @@ public PCollectionRowTuple expand(@NonNull 
PCollectionRowTuple input) {
   @Override
   public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull 
@Initialized String>
       outputCollectionNames() {
-    return Arrays.asList("failures", "errors");
+    return Collections.singletonList("post-write");

Review Comment:
   I don't think this method is relevant anymore, but just for clarity, let's 
add the "errors" tag to express that the output expects to have 2 tags
   ```suggestion
       Arrays.asList("post-write", "errors");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to