johnjcasey commented on code in PR #26970:
URL: https://github.com/apache/beam/pull/26970#discussion_r1218307979
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangestreamsReadSchemaTransformProvider.java:
##########
@@ -193,15 +218,19 @@ public abstract static class Builder {
}
}
- private static final class DataChangeRecordToRow extends
DoFn<DataChangeRecord, Row> {
+ public static final class DataChangeRecordToRow extends
DoFn<DataChangeRecord, Row> {
Review Comment:
why is this changed to public?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangestreamsReadSchemaTransformProvider.java:
##########
@@ -235,41 +264,58 @@ public void process(@DoFn.Element DataChangeRecord
record, OutputReceiver<Row> r
.map(nonNullValues -> getGson().fromJson(nonNullValues,
Map.class))
.orElseGet(Collections::emptyMap);
- for (Map.Entry<String, String> valueEntry : newValues.entrySet()) {
- if (valueEntry.getValue() == null) {
- continue;
+ try {
+ for (Map.Entry<String, String> valueEntry : newValues.entrySet()) {
+ if (valueEntry.getValue() == null) {
+ continue;
+ }
+ // TODO(pabloem): Understand why SpannerSchema has field names in
lowercase...
+ rowBuilder =
+ rowBuilder.withFieldValue(
+ valueEntry.getKey().toLowerCase(),
+ stringToParsedValue(
+
internalRowSchema.getField(valueEntry.getKey().toLowerCase()).getType(),
+ valueEntry.getValue()));
}
- // TODO(pabloem): Understand why SpannerSchema has field names in
lowercase...
- rowBuilder =
- rowBuilder.withFieldValue(
- valueEntry.getKey().toLowerCase(),
- stringToParsedValue(
-
internalRowSchema.getField(valueEntry.getKey().toLowerCase()).getType(),
- valueEntry.getValue()));
- }
- for (Map.Entry<String, String> pkEntry : keyValues.entrySet()) {
- if (pkEntry.getValue() == null) {
- continue;
+ for (Map.Entry<String, String> pkEntry : keyValues.entrySet()) {
+ if (pkEntry.getValue() == null) {
+ continue;
+ }
+ // TODO(pabloem): Understand why SpannerSchema has field names in
lowercase...
+ rowBuilder =
+ rowBuilder.withFieldValue(
+ pkEntry.getKey().toLowerCase(),
+ stringToParsedValue(
+
internalRowSchema.getField(pkEntry.getKey().toLowerCase()).getType(),
+ pkEntry.getValue()));
}
- // TODO(pabloem): Understand why SpannerSchema has field names in
lowercase...
- rowBuilder =
- rowBuilder.withFieldValue(
- pkEntry.getKey().toLowerCase(),
- stringToParsedValue(
-
internalRowSchema.getField(pkEntry.getKey().toLowerCase()).getType(),
- pkEntry.getValue()));
+ receiver
+ .get(OUTPUT_TAG)
+ .outputWithTimestamp(
+ Row.withSchema(tableChangeRecordSchema)
+ .addValue(record.getModType().toString())
+ .addValue(record.getCommitTimestamp().toString())
+ .addValue(Long.parseLong(record.getRecordSequence()))
+ .addValue(rowBuilder.build())
+ .build(),
+ timestamp);
+ } catch (Exception e) {
+ errorsInBundle += 1;
Review Comment:
Is is it possible for anything here to have an exception? I'm especially
looking at .getKeysJson and .getNewValuesJson. If so, we would fail to output
to the error queue
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]