changliiu commented on code in PR #35408:
URL: https://github.com/apache/beam/pull/35408#discussion_r2167349337


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java:
##########
@@ -223,12 +228,224 @@ public List<ChangeStreamRecord> toChangeStreamRecords(
       return Collections.singletonList(
           toChangeStreamRecordJson(partition, resultSet.getPgJsonb(0), 
resultSetMetadata));
     }
-    // In GoogleSQL, change stream records are returned as an array of structs.
+
+    // In GoogleSQL, for `IMMUTABLE_KEY_RANGE` option, change stream records 
are returned as Protos.
+    if (isProtoChangeRecord(resultSet)) {
+      return Arrays.asList(
+          fromProtoChangeStreamRecord(
+              partition, resultSetMetadata, 
resultSet.getProtoChangeStreamRecord(0)));
+    }
+
+    // In GoogleSQL, for `MUTABLE_KEY_RANGE` option, change stream records are 
returned as an array
+    // of structs.
     return resultSet.getCurrentRowAsStruct().getStructList(0).stream()
         .flatMap(struct -> toChangeStreamRecord(partition, struct, 
resultSetMetadata))
         .collect(Collectors.toList());
   }
 
+  boolean isProtoChangeRecord(ChangeStreamResultSet currentRow) {
+    return currentRow.getColumnCount() == 1
+        && !currentRow.isNull(0)
+        && currentRow.getColumnType(0).getCode() == 
com.google.cloud.spanner.Type.Code.PROTO;
+  }
+
+  ChangeStreamRecord fromProtoChangeStreamRecord(
+      PartitionMetadata partition,
+      ChangeStreamResultSetMetadata resultSetMetadata,
+      com.google.spanner.v1.ChangeStreamRecord changeStreamRecordProto) {
+    if (changeStreamRecordProto.hasPartitionStartRecord()) {
+      return parseProtoPartitionStartRecord(
+          partition, resultSetMetadata, 
changeStreamRecordProto.getPartitionStartRecord());
+    } else if (changeStreamRecordProto.hasPartitionEndRecord()) {
+      return parseProtoPartitionEndRecord(
+          partition, resultSetMetadata, 
changeStreamRecordProto.getPartitionEndRecord());
+    } else if (changeStreamRecordProto.hasPartitionEventRecord()) {
+      return parseProtoPartitionEventRecord(
+          partition, resultSetMetadata, 
changeStreamRecordProto.getPartitionEventRecord());
+    } else if (changeStreamRecordProto.hasHeartbeatRecord()) {
+      return parseProtoHeartbeatRecord(
+          partition, resultSetMetadata, 
changeStreamRecordProto.getHeartbeatRecord());
+    } else if (changeStreamRecordProto.hasDataChangeRecord()) {
+      return parseProtoDataChangeRecord(
+          partition, resultSetMetadata, 
changeStreamRecordProto.getDataChangeRecord());
+    } else {
+      throw new IllegalArgumentException(
+          "Unknown change stream record type " + 
changeStreamRecordProto.toString());
+    }
+  }
+
+  ChangeStreamRecord parseProtoPartitionStartRecord(

Review Comment:
   Only json return type use the optional wrap with exception, because we have 
little control of what's inside the json return type. However using Struct (v1 
GSQL) or PROTO (v2 GSQL) we have strong control on the content. Hence I think 
we don't need the extra cautious by optional wrap like what we did on json 
return type before.
   
   Please re-open if you have different opinion. If so we can discuss offline.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to