This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bafabc  Merge pull request #16726 from [BEAM-12164]: Parses change 
streams fields as json / strings
7bafabc is described below

commit 7bafabc19e6e37c200da039bc0417da4bb4949c4
Author: Thiago Nunes <[email protected]>
AuthorDate: Mon Feb 7 13:42:55 2022 +1100

    Merge pull request #16726 from [BEAM-12164]: Parses change streams fields 
as json / strings
    
    The backend is migrating from returning strings for certain fields to
    json. We need to change the parsing logic to accommodate for both until
    the migration is completed.
---
 .../mapper/ChangeStreamRecordMapper.java           |  34 ++++--
 .../mapper/ChangeStreamRecordMapperTest.java       |  36 +++---
 .../changestreams/util/TestStructMapper.java       | 126 ++++++++++++++++-----
 3 files changed, 143 insertions(+), 53 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java
index 1d81313..4368ba7 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java
@@ -19,6 +19,8 @@ package 
org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper;
 
 import com.google.cloud.Timestamp;
 import com.google.cloud.spanner.Struct;
+import com.google.cloud.spanner.Type;
+import com.google.cloud.spanner.Value;
 import java.util.HashSet;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -277,22 +279,25 @@ public class ChangeStreamRecordMapper {
   }
 
   private ColumnType columnTypeFrom(Struct struct) {
+    // TODO: Move to type struct.getJson when backend is fully migrated
+    final String type = getJsonString(struct.getValue(TYPE_COLUMN));
     return new ColumnType(
         struct.getString(NAME_COLUMN),
-        new TypeCode(struct.getString(TYPE_COLUMN)),
+        new TypeCode(type),
         struct.getBoolean(IS_PRIMARY_KEY_COLUMN),
         struct.getLong(ORDINAL_POSITION_COLUMN));
   }
 
   private Mod modFrom(Struct struct) {
-    final String keysJson = struct.getString(KEYS_COLUMN);
-    final String oldValuesJson =
-        struct.isNull(OLD_VALUES_COLUMN) ? null : 
struct.getString(OLD_VALUES_COLUMN);
-    final String newValuesJson =
-        struct.isNull(NEW_VALUES_COLUMN)
-            ? null
-            : struct.getString(ChangeStreamRecordMapper.NEW_VALUES_COLUMN);
-    return new Mod(keysJson, oldValuesJson, newValuesJson);
+    // TODO: Move to keys struct.getJson when backend is fully migrated
+    final String keys = getJsonString(struct.getValue(KEYS_COLUMN));
+    // TODO: Move to oldValues struct.getJson when backend is fully migrated
+    final String oldValues =
+        struct.isNull(OLD_VALUES_COLUMN) ? null : 
getJsonString(struct.getValue(OLD_VALUES_COLUMN));
+    // TODO: Move to newValues struct.getJson when backend is fully migrated
+    final String newValues =
+        struct.isNull(NEW_VALUES_COLUMN) ? null : 
getJsonString(struct.getValue(NEW_VALUES_COLUMN));
+    return new Mod(keys, oldValues, newValues);
   }
 
   private ChildPartition childPartitionFrom(String partitionToken, Struct 
struct) {
@@ -324,4 +329,15 @@ public class ChangeStreamRecordMapper {
         .withNumberOfRecordsRead(resultSetMetadata.getNumberOfRecordsRead())
         .build();
   }
+
+  // TODO: Remove when backend is fully migrated to JSON
+  private String getJsonString(Value value) {
+    if (value.getType().equals(Type.json())) {
+      return value.getJson();
+    } else if (value.getType().equals(Type.string())) {
+      return value.getString();
+    } else {
+      throw new IllegalArgumentException("Can not extract string from value " 
+ value);
+    }
+  }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java
index 26609f6..50f4292 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java
@@ -17,7 +17,8 @@
  */
 package org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper;
 
-import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestStructMapper.recordsToStruct;
+import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestStructMapper.recordsToStructWithJson;
+import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestStructMapper.recordsToStructWithStrings;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -98,11 +99,15 @@ public class ChangeStreamRecordMapperTest {
             10L,
             2L,
             null);
-    final Struct struct = recordsToStruct(dataChangeRecord);
+    final Struct stringFieldsStruct = 
recordsToStructWithStrings(dataChangeRecord);
+    final Struct jsonFieldsStruct = recordsToStructWithJson(dataChangeRecord);
 
     assertEquals(
         Collections.singletonList(dataChangeRecord),
-        mapper.toChangeStreamRecords(partition, struct, resultSetMetadata));
+        mapper.toChangeStreamRecords(partition, stringFieldsStruct, 
resultSetMetadata));
+    assertEquals(
+        Collections.singletonList(dataChangeRecord),
+        mapper.toChangeStreamRecords(partition, jsonFieldsStruct, 
resultSetMetadata));
   }
 
   @Test
@@ -125,11 +130,15 @@ public class ChangeStreamRecordMapperTest {
             10L,
             2L,
             null);
-    final Struct struct = recordsToStruct(dataChangeRecord);
+    final Struct stringFieldsStruct = 
recordsToStructWithStrings(dataChangeRecord);
+    final Struct jsonFieldsStruct = recordsToStructWithJson(dataChangeRecord);
 
     assertEquals(
         Collections.singletonList(dataChangeRecord),
-        mapper.toChangeStreamRecords(partition, struct, resultSetMetadata));
+        mapper.toChangeStreamRecords(partition, stringFieldsStruct, 
resultSetMetadata));
+    assertEquals(
+        Collections.singletonList(dataChangeRecord),
+        mapper.toChangeStreamRecords(partition, jsonFieldsStruct, 
resultSetMetadata));
   }
 
   @Test
@@ -152,18 +161,22 @@ public class ChangeStreamRecordMapperTest {
             10L,
             2L,
             null);
-    final Struct struct = recordsToStruct(dataChangeRecord);
+    final Struct stringFieldsStruct = 
recordsToStructWithStrings(dataChangeRecord);
+    final Struct jsonFieldsStruct = recordsToStructWithJson(dataChangeRecord);
 
     assertEquals(
         Collections.singletonList(dataChangeRecord),
-        mapper.toChangeStreamRecords(partition, struct, resultSetMetadata));
+        mapper.toChangeStreamRecords(partition, stringFieldsStruct, 
resultSetMetadata));
+    assertEquals(
+        Collections.singletonList(dataChangeRecord),
+        mapper.toChangeStreamRecords(partition, jsonFieldsStruct, 
resultSetMetadata));
   }
 
   @Test
   public void testMappingStructRowToHeartbeatRecord() {
     final HeartbeatRecord heartbeatRecord =
         new HeartbeatRecord(Timestamp.ofTimeSecondsAndNanos(10L, 20), null);
-    final Struct struct = recordsToStruct(heartbeatRecord);
+    final Struct struct = recordsToStructWithStrings(heartbeatRecord);
 
     assertEquals(
         Collections.singletonList(heartbeatRecord),
@@ -180,7 +193,7 @@ public class ChangeStreamRecordMapperTest {
                 new ChildPartition("childToken1", 
Sets.newHashSet("parentToken1", "parentToken2")),
                 new ChildPartition("childToken2", 
Sets.newHashSet("parentToken1", "parentToken2"))),
             null);
-    final Struct struct = recordsToStruct(childPartitionsRecord);
+    final Struct struct = recordsToStructWithStrings(childPartitionsRecord);
 
     assertEquals(
         Collections.singletonList(childPartitionsRecord),
@@ -191,7 +204,7 @@ public class ChangeStreamRecordMapperTest {
   @Test
   public void testMappingStructRowFromInitialPartitionToChildPartitionRecord() 
{
     final Struct struct =
-        recordsToStruct(
+        recordsToStructWithStrings(
             new ChildPartitionsRecord(
                 Timestamp.ofTimeSecondsAndNanos(10L, 20),
                 "1",
@@ -217,7 +230,4 @@ public class ChangeStreamRecordMapperTest {
         Collections.singletonList(expected),
         mapper.toChangeStreamRecords(initialPartition, struct, 
resultSetMetadata));
   }
-
-  // TODO: Add test case for unknown record type
-  // TODO: Add test case for malformed record
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/util/TestStructMapper.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/util/TestStructMapper.java
index b699e0a..a3a434c 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/util/TestStructMapper.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/util/TestStructMapper.java
@@ -38,26 +38,53 @@ public class TestStructMapper {
       Type.struct(
           StructField.of("token", Type.string()),
           StructField.of("parent_partition_tokens", 
Type.array(Type.string())));
-  private static final Type COLUMN_TYPE_TYPE =
+  // TODO: Remove COLUMN_TYPE_STRING_TYPE when backend has fully migrated to 
use JSON
+  private static final Type COLUMN_TYPE_STRING_TYPE =
       Type.struct(
           StructField.of("name", Type.string()),
           StructField.of("type", Type.string()),
           StructField.of("is_primary_key", Type.bool()),
           StructField.of("ordinal_position", Type.int64()));
-  private static final Type MOD_TYPE =
+  private static final Type COLUMN_TYPE_JSON_TYPE =
+      Type.struct(
+          StructField.of("name", Type.string()),
+          StructField.of("type", Type.json()),
+          StructField.of("is_primary_key", Type.bool()),
+          StructField.of("ordinal_position", Type.int64()));
+  // TODO: Remove MOD_STRING_TYPE when backend has fully migrated to use JSON
+  private static final Type MOD_STRING_TYPE =
       Type.struct(
           StructField.of("keys", Type.string()),
           StructField.of("new_values", Type.string()),
           StructField.of("old_values", Type.string()));
-  private static final Type DATA_CHANGE_RECORD_TYPE =
+  private static final Type MOD_JSON_TYPE =
+      Type.struct(
+          StructField.of("keys", Type.json()),
+          StructField.of("new_values", Type.json()),
+          StructField.of("old_values", Type.json()));
+  // TODO: Remove DATA_CHANGE_RECORD_STRING_TYPE when backend has fully 
migrated to use JSON
+  private static final Type DATA_CHANGE_RECORD_STRING_TYPE =
+      Type.struct(
+          StructField.of("commit_timestamp", Type.timestamp()),
+          StructField.of("record_sequence", Type.string()),
+          StructField.of("server_transaction_id", Type.string()),
+          StructField.of("is_last_record_in_transaction_in_partition", 
Type.bool()),
+          StructField.of("table_name", Type.string()),
+          StructField.of("column_types", Type.array(COLUMN_TYPE_STRING_TYPE)),
+          StructField.of("mods", Type.array(MOD_STRING_TYPE)),
+          StructField.of("mod_type", Type.string()),
+          StructField.of("value_capture_type", Type.string()),
+          StructField.of("number_of_records_in_transaction", Type.int64()),
+          StructField.of("number_of_partitions_in_transaction", Type.int64()));
+  private static final Type DATA_CHANGE_RECORD_JSON_TYPE =
       Type.struct(
           StructField.of("commit_timestamp", Type.timestamp()),
           StructField.of("record_sequence", Type.string()),
           StructField.of("server_transaction_id", Type.string()),
           StructField.of("is_last_record_in_transaction_in_partition", 
Type.bool()),
           StructField.of("table_name", Type.string()),
-          StructField.of("column_types", Type.array(COLUMN_TYPE_TYPE)),
-          StructField.of("mods", Type.array(MOD_TYPE)),
+          StructField.of("column_types", Type.array(COLUMN_TYPE_JSON_TYPE)),
+          StructField.of("mods", Type.array(MOD_JSON_TYPE)),
           StructField.of("mod_type", Type.string()),
           StructField.of("value_capture_type", Type.string()),
           StructField.of("number_of_records_in_transaction", Type.int64()),
@@ -69,39 +96,59 @@ public class TestStructMapper {
           StructField.of("start_timestamp", Type.timestamp()),
           StructField.of("record_sequence", Type.string()),
           StructField.of("child_partitions", 
Type.array(CHILD_PARTITION_TYPE)));
-  private static final Type STREAM_RECORD_TYPE =
+  // TODO: Remove STREAM_RECORD_STRING_TYPE when backend has fully migrated to 
use JSON
+  private static final Type STREAM_RECORD_STRING_TYPE =
       Type.struct(
-          StructField.of("data_change_record", 
Type.array(DATA_CHANGE_RECORD_TYPE)),
+          StructField.of("data_change_record", 
Type.array(DATA_CHANGE_RECORD_STRING_TYPE)),
           StructField.of("heartbeat_record", 
Type.array(HEARTBEAT_RECORD_TYPE)),
           StructField.of("child_partitions_record", 
Type.array(CHILD_PARTITIONS_RECORD_TYPE)));
+  private static final Type STREAM_RECORD_JSON_TYPE =
+      Type.struct(
+          StructField.of("data_change_record", 
Type.array(DATA_CHANGE_RECORD_JSON_TYPE)),
+          StructField.of("heartbeat_record", 
Type.array(HEARTBEAT_RECORD_TYPE)),
+          StructField.of("child_partitions_record", 
Type.array(CHILD_PARTITIONS_RECORD_TYPE)));
+
+  public static Struct recordsToStructWithJson(ChangeStreamRecord... records) {
+    return recordsToStruct(true, records);
+  }
+
+  // TODO: Remove when backend is fully migrated to JSON
+  public static Struct recordsToStructWithStrings(ChangeStreamRecord... 
records) {
+    return recordsToStruct(false, records);
+  }
 
-  public static Struct recordsToStruct(ChangeStreamRecord... records) {
+  private static Struct recordsToStruct(boolean useJsonFields, 
ChangeStreamRecord... records) {
+    final Type streamRecordType =
+        useJsonFields ? STREAM_RECORD_JSON_TYPE : STREAM_RECORD_STRING_TYPE;
     return Struct.newBuilder()
         .add(
             Value.structArray(
-                STREAM_RECORD_TYPE,
+                streamRecordType,
                 Arrays.stream(records)
-                    .map(TestStructMapper::streamRecordStructFrom)
+                    .map(record -> 
TestStructMapper.streamRecordStructFrom(record, useJsonFields))
                     .collect(Collectors.toList())))
         .build();
   }
 
-  private static Struct streamRecordStructFrom(ChangeStreamRecord record) {
+  private static Struct streamRecordStructFrom(ChangeStreamRecord record, 
boolean useJsonFields) {
     if (record instanceof DataChangeRecord) {
-      return streamRecordStructFrom((DataChangeRecord) record);
+      return streamRecordStructFrom((DataChangeRecord) record, useJsonFields);
     } else if (record instanceof HeartbeatRecord) {
-      return streamRecordStructFrom((HeartbeatRecord) record);
+      return streamRecordStructFrom((HeartbeatRecord) record, useJsonFields);
     } else if (record instanceof ChildPartitionsRecord) {
-      return streamRecordStructFrom((ChildPartitionsRecord) record);
+      return streamRecordStructFrom((ChildPartitionsRecord) record, 
useJsonFields);
     } else {
       throw new UnsupportedOperationException("Unimplemented mapping for " + 
record.getClass());
     }
   }
 
-  private static Struct streamRecordStructFrom(ChildPartitionsRecord record) {
+  private static Struct streamRecordStructFrom(
+      ChildPartitionsRecord record, boolean useJsonFields) {
+    final Type dataChangeRecordType =
+        useJsonFields ? DATA_CHANGE_RECORD_JSON_TYPE : 
DATA_CHANGE_RECORD_STRING_TYPE;
     return Struct.newBuilder()
         .set("data_change_record")
-        .to(Value.structArray(DATA_CHANGE_RECORD_TYPE, 
Collections.emptyList()))
+        .to(Value.structArray(dataChangeRecordType, Collections.emptyList()))
         .set("heartbeat_record")
         .to(Value.structArray(HEARTBEAT_RECORD_TYPE, Collections.emptyList()))
         .set("child_partitions_record")
@@ -128,10 +175,12 @@ public class TestStructMapper {
         .build();
   }
 
-  private static Struct streamRecordStructFrom(HeartbeatRecord record) {
+  private static Struct streamRecordStructFrom(HeartbeatRecord record, boolean 
useJsonFields) {
+    final Type dataChangeRecordType =
+        useJsonFields ? DATA_CHANGE_RECORD_JSON_TYPE : 
DATA_CHANGE_RECORD_STRING_TYPE;
     return Struct.newBuilder()
         .set("data_change_record")
-        .to(Value.structArray(DATA_CHANGE_RECORD_TYPE, 
Collections.emptyList()))
+        .to(Value.structArray(dataChangeRecordType, Collections.emptyList()))
         .set("heartbeat_record")
         .to(
             Value.structArray(
@@ -145,12 +194,15 @@ public class TestStructMapper {
     return 
Struct.newBuilder().set("timestamp").to(record.getTimestamp()).build();
   }
 
-  private static Struct streamRecordStructFrom(DataChangeRecord record) {
+  private static Struct streamRecordStructFrom(DataChangeRecord record, 
boolean useJsonFields) {
+    final Type dataChangeRecordType =
+        useJsonFields ? DATA_CHANGE_RECORD_JSON_TYPE : 
DATA_CHANGE_RECORD_STRING_TYPE;
     return Struct.newBuilder()
         .set("data_change_record")
         .to(
             Value.structArray(
-                DATA_CHANGE_RECORD_TYPE, 
Collections.singletonList(recordStructFrom(record))))
+                dataChangeRecordType,
+                Collections.singletonList(recordStructFrom(record, 
useJsonFields))))
         .set("heartbeat_record")
         .to(Value.structArray(HEARTBEAT_RECORD_TYPE, Collections.emptyList()))
         .set("child_partitions_record")
@@ -158,18 +210,20 @@ public class TestStructMapper {
         .build();
   }
 
-  private static Struct recordStructFrom(DataChangeRecord record) {
+  private static Struct recordStructFrom(DataChangeRecord record, boolean 
useJsonFields) {
+    final Type columnTypeType = useJsonFields ? COLUMN_TYPE_JSON_TYPE : 
COLUMN_TYPE_STRING_TYPE;
+    final Type modType = useJsonFields ? MOD_JSON_TYPE : MOD_STRING_TYPE;
     final Value columnTypes =
         Value.structArray(
-            COLUMN_TYPE_TYPE,
+            columnTypeType,
             record.getRowType().stream()
-                .map(TestStructMapper::columnTypeStructFrom)
+                .map(rowType -> TestStructMapper.columnTypeStructFrom(rowType, 
useJsonFields))
                 .collect(Collectors.toList()));
     final Value mods =
         Value.structArray(
-            MOD_TYPE,
+            modType,
             record.getMods().stream()
-                .map(TestStructMapper::modStructFrom)
+                .map(mod -> TestStructMapper.modStructFrom(mod, useJsonFields))
                 .collect(Collectors.toList()));
     return Struct.newBuilder()
         .set("commit_timestamp")
@@ -197,12 +251,16 @@ public class TestStructMapper {
         .build();
   }
 
-  private static Struct columnTypeStructFrom(ColumnType columnType) {
+  private static Struct columnTypeStructFrom(ColumnType columnType, boolean 
useJsonFields) {
+    final Value type =
+        useJsonFields
+            ? Value.json(columnType.getType().getCode())
+            : Value.string(columnType.getType().getCode());
     return Struct.newBuilder()
         .set("name")
         .to(columnType.getName())
         .set("type")
-        .to(columnType.getType().getCode())
+        .to(type)
         .set("is_primary_key")
         .to(columnType.isPrimaryKey())
         .set("ordinal_position")
@@ -210,14 +268,20 @@ public class TestStructMapper {
         .build();
   }
 
-  private static Struct modStructFrom(Mod mod) {
+  private static Struct modStructFrom(Mod mod, boolean useJsonFields) {
+    final Value keys =
+        useJsonFields ? Value.json(mod.getKeysJson()) : 
Value.string(mod.getKeysJson());
+    final Value newValues =
+        useJsonFields ? Value.json(mod.getNewValuesJson()) : 
Value.string(mod.getNewValuesJson());
+    final Value oldValues =
+        useJsonFields ? Value.json(mod.getOldValuesJson()) : 
Value.string(mod.getOldValuesJson());
     return Struct.newBuilder()
         .set("keys")
-        .to(mod.getKeysJson())
+        .to(keys)
         .set("new_values")
-        .to(mod.getNewValuesJson())
+        .to(newValues)
         .set("old_values")
-        .to(mod.getOldValuesJson())
+        .to(oldValues)
         .build();
   }
 

Reply via email to