dedocibula commented on code in PR #35408:
URL: https://github.com/apache/beam/pull/35408#discussion_r2165244622
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java:
##########
@@ -223,12 +228,224 @@ public List<ChangeStreamRecord> toChangeStreamRecords(
return Collections.singletonList(
toChangeStreamRecordJson(partition, resultSet.getPgJsonb(0),
resultSetMetadata));
}
- // In GoogleSQL, change stream records are returned as an array of structs.
+
+ // In GoogleSQL, for `IMMUTABLE_KEY_RANGE` option, change stream records
are returned as Protos.
+ if (isProtoChangeRecord(resultSet)) {
+ return Arrays.asList(
+ fromProtoChangeStreamRecord(
+ partition, resultSetMetadata,
resultSet.getProtoChangeStreamRecord(0)));
+ }
+
+ // In GoogleSQL, for `MUTABLE_KEY_RANGE` option, change stream records are
returned as an array
+ // of structs.
return resultSet.getCurrentRowAsStruct().getStructList(0).stream()
.flatMap(struct -> toChangeStreamRecord(partition, struct,
resultSetMetadata))
.collect(Collectors.toList());
}
+ boolean isProtoChangeRecord(ChangeStreamResultSet currentRow) {
+ return currentRow.getColumnCount() == 1
+ && !currentRow.isNull(0)
+ && currentRow.getColumnType(0).getCode() ==
com.google.cloud.spanner.Type.Code.PROTO;
+ }
+
+ ChangeStreamRecord fromProtoChangeStreamRecord(
+ PartitionMetadata partition,
+ ChangeStreamResultSetMetadata resultSetMetadata,
+ com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto) {
+ if (changeStreamRecordProto.hasPartitionStartRecord()) {
+ return parseProtoPartitionStartRecord(
+ partition, resultSetMetadata,
changeStreamRecordProto.getPartitionStartRecord());
+ } else if (changeStreamRecordProto.hasPartitionEndRecord()) {
+ return parseProtoPartitionEndRecord(
+ partition, resultSetMetadata,
changeStreamRecordProto.getPartitionEndRecord());
+ } else if (changeStreamRecordProto.hasPartitionEventRecord()) {
+ return parseProtoPartitionEventRecord(
+ partition, resultSetMetadata,
changeStreamRecordProto.getPartitionEventRecord());
+ } else if (changeStreamRecordProto.hasHeartbeatRecord()) {
+ return parseProtoHeartbeatRecord(
+ partition, resultSetMetadata,
changeStreamRecordProto.getHeartbeatRecord());
+ } else if (changeStreamRecordProto.hasDataChangeRecord()) {
+ return parseProtoDataChangeRecord(
+ partition, resultSetMetadata,
changeStreamRecordProto.getDataChangeRecord());
+ } else {
+ throw new IllegalArgumentException(
+ "Unknown change stream record type " +
changeStreamRecordProto.toString());
+ }
+ }
+
+ ChangeStreamRecord parseProtoPartitionStartRecord(
Review Comment:
Let's still preserve to<RecordType> syntax here. Similarly, we may want to
use the optional wrap with exception throwing for the fields that are undefined
as we are doing in v1
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]