Dippatel98 commented on code in PR #26970:
URL: https://github.com/apache/beam/pull/26970#discussion_r1218324568


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangestreamsReadSchemaTransformProvider.java:
##########
@@ -235,41 +264,58 @@ public void process(@DoFn.Element DataChangeRecord 
record, OutputReceiver<Row> r
                 .map(nonNullValues -> getGson().fromJson(nonNullValues, 
Map.class))
                 .orElseGet(Collections::emptyMap);
 
-        for (Map.Entry<String, String> valueEntry : newValues.entrySet()) {
-          if (valueEntry.getValue() == null) {
-            continue;
+        try {
+          for (Map.Entry<String, String> valueEntry : newValues.entrySet()) {
+            if (valueEntry.getValue() == null) {
+              continue;
+            }
+            // TODO(pabloem): Understand why SpannerSchema has field names in 
lowercase...
+            rowBuilder =
+                rowBuilder.withFieldValue(
+                    valueEntry.getKey().toLowerCase(),
+                    stringToParsedValue(
+                        
internalRowSchema.getField(valueEntry.getKey().toLowerCase()).getType(),
+                        valueEntry.getValue()));
           }
-          // TODO(pabloem): Understand why SpannerSchema has field names in 
lowercase...
-          rowBuilder =
-              rowBuilder.withFieldValue(
-                  valueEntry.getKey().toLowerCase(),
-                  stringToParsedValue(
-                      
internalRowSchema.getField(valueEntry.getKey().toLowerCase()).getType(),
-                      valueEntry.getValue()));
-        }
 
-        for (Map.Entry<String, String> pkEntry : keyValues.entrySet()) {
-          if (pkEntry.getValue() == null) {
-            continue;
+          for (Map.Entry<String, String> pkEntry : keyValues.entrySet()) {
+            if (pkEntry.getValue() == null) {
+              continue;
+            }
+            // TODO(pabloem): Understand why SpannerSchema has field names in 
lowercase...
+            rowBuilder =
+                rowBuilder.withFieldValue(
+                    pkEntry.getKey().toLowerCase(),
+                    stringToParsedValue(
+                        
internalRowSchema.getField(pkEntry.getKey().toLowerCase()).getType(),
+                        pkEntry.getValue()));
           }
-          // TODO(pabloem): Understand why SpannerSchema has field names in 
lowercase...
-          rowBuilder =
-              rowBuilder.withFieldValue(
-                  pkEntry.getKey().toLowerCase(),
-                  stringToParsedValue(
-                      
internalRowSchema.getField(pkEntry.getKey().toLowerCase()).getType(),
-                      pkEntry.getValue()));
+          receiver
+              .get(OUTPUT_TAG)
+              .outputWithTimestamp(
+                  Row.withSchema(tableChangeRecordSchema)
+                      .addValue(record.getModType().toString())
+                      .addValue(record.getCommitTimestamp().toString())
+                      .addValue(Long.parseLong(record.getRecordSequence()))
+                      .addValue(rowBuilder.build())
+                      .build(),
+                  timestamp);
+        } catch (Exception e) {
+          errorsInBundle += 1;

Review Comment:
   I discovered that it is possible for the spanner schema to change while this 
transform is running. It is a case which is handled in the SpannerIO 
successfully, but our transform doesn't handle it. That is the only mode of 
failure that I could come up with. As for the .getKeysJson and 
.getNewValuesJson they are returning the spanner row as JSON from the Mod and 
mapping it to a hash map. The Mod is created by the I/O itself, so I don't 
think there will be any formatting issues there and converting them to a hash 
map should be straightforward. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to