This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 7bafabc Merge pull request #16726 from [BEAM-12164]: Parses change
streams fields as json / strings
7bafabc is described below
commit 7bafabc19e6e37c200da039bc0417da4bb4949c4
Author: Thiago Nunes <[email protected]>
AuthorDate: Mon Feb 7 13:42:55 2022 +1100
Merge pull request #16726 from [BEAM-12164]: Parses change streams fields
as json / strings
The backend is migrating from returning strings for certain fields to
json. We need to change the parsing logic to accommodate for both until
the migration is completed.
---
.../mapper/ChangeStreamRecordMapper.java | 34 ++++--
.../mapper/ChangeStreamRecordMapperTest.java | 36 +++---
.../changestreams/util/TestStructMapper.java | 126 ++++++++++++++++-----
3 files changed, 143 insertions(+), 53 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java
index 1d81313..4368ba7 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java
@@ -19,6 +19,8 @@ package
org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Struct;
+import com.google.cloud.spanner.Type;
+import com.google.cloud.spanner.Value;
import java.util.HashSet;
import java.util.List;
import java.util.stream.Collectors;
@@ -277,22 +279,25 @@ public class ChangeStreamRecordMapper {
}
private ColumnType columnTypeFrom(Struct struct) {
+ // TODO: Move to type struct.getJson when backend is fully migrated
+ final String type = getJsonString(struct.getValue(TYPE_COLUMN));
return new ColumnType(
struct.getString(NAME_COLUMN),
- new TypeCode(struct.getString(TYPE_COLUMN)),
+ new TypeCode(type),
struct.getBoolean(IS_PRIMARY_KEY_COLUMN),
struct.getLong(ORDINAL_POSITION_COLUMN));
}
private Mod modFrom(Struct struct) {
- final String keysJson = struct.getString(KEYS_COLUMN);
- final String oldValuesJson =
- struct.isNull(OLD_VALUES_COLUMN) ? null :
struct.getString(OLD_VALUES_COLUMN);
- final String newValuesJson =
- struct.isNull(NEW_VALUES_COLUMN)
- ? null
- : struct.getString(ChangeStreamRecordMapper.NEW_VALUES_COLUMN);
- return new Mod(keysJson, oldValuesJson, newValuesJson);
+ // TODO: Move to keys struct.getJson when backend is fully migrated
+ final String keys = getJsonString(struct.getValue(KEYS_COLUMN));
+ // TODO: Move to oldValues struct.getJson when backend is fully migrated
+ final String oldValues =
+ struct.isNull(OLD_VALUES_COLUMN) ? null :
getJsonString(struct.getValue(OLD_VALUES_COLUMN));
+ // TODO: Move to newValues struct.getJson when backend is fully migrated
+ final String newValues =
+ struct.isNull(NEW_VALUES_COLUMN) ? null :
getJsonString(struct.getValue(NEW_VALUES_COLUMN));
+ return new Mod(keys, oldValues, newValues);
}
private ChildPartition childPartitionFrom(String partitionToken, Struct
struct) {
@@ -324,4 +329,15 @@ public class ChangeStreamRecordMapper {
.withNumberOfRecordsRead(resultSetMetadata.getNumberOfRecordsRead())
.build();
}
+
+ // TODO: Remove when backend is fully migrated to JSON
+ private String getJsonString(Value value) {
+ if (value.getType().equals(Type.json())) {
+ return value.getJson();
+ } else if (value.getType().equals(Type.string())) {
+ return value.getString();
+ } else {
+ throw new IllegalArgumentException("Can not extract string from value "
+ value);
+ }
+ }
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java
index 26609f6..50f4292 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java
@@ -17,7 +17,8 @@
*/
package org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper;
-import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestStructMapper.recordsToStruct;
+import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestStructMapper.recordsToStructWithJson;
+import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestStructMapper.recordsToStructWithStrings;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -98,11 +99,15 @@ public class ChangeStreamRecordMapperTest {
10L,
2L,
null);
- final Struct struct = recordsToStruct(dataChangeRecord);
+ final Struct stringFieldsStruct =
recordsToStructWithStrings(dataChangeRecord);
+ final Struct jsonFieldsStruct = recordsToStructWithJson(dataChangeRecord);
assertEquals(
Collections.singletonList(dataChangeRecord),
- mapper.toChangeStreamRecords(partition, struct, resultSetMetadata));
+ mapper.toChangeStreamRecords(partition, stringFieldsStruct,
resultSetMetadata));
+ assertEquals(
+ Collections.singletonList(dataChangeRecord),
+ mapper.toChangeStreamRecords(partition, jsonFieldsStruct,
resultSetMetadata));
}
@Test
@@ -125,11 +130,15 @@ public class ChangeStreamRecordMapperTest {
10L,
2L,
null);
- final Struct struct = recordsToStruct(dataChangeRecord);
+ final Struct stringFieldsStruct =
recordsToStructWithStrings(dataChangeRecord);
+ final Struct jsonFieldsStruct = recordsToStructWithJson(dataChangeRecord);
assertEquals(
Collections.singletonList(dataChangeRecord),
- mapper.toChangeStreamRecords(partition, struct, resultSetMetadata));
+ mapper.toChangeStreamRecords(partition, stringFieldsStruct,
resultSetMetadata));
+ assertEquals(
+ Collections.singletonList(dataChangeRecord),
+ mapper.toChangeStreamRecords(partition, jsonFieldsStruct,
resultSetMetadata));
}
@Test
@@ -152,18 +161,22 @@ public class ChangeStreamRecordMapperTest {
10L,
2L,
null);
- final Struct struct = recordsToStruct(dataChangeRecord);
+ final Struct stringFieldsStruct =
recordsToStructWithStrings(dataChangeRecord);
+ final Struct jsonFieldsStruct = recordsToStructWithJson(dataChangeRecord);
assertEquals(
Collections.singletonList(dataChangeRecord),
- mapper.toChangeStreamRecords(partition, struct, resultSetMetadata));
+ mapper.toChangeStreamRecords(partition, stringFieldsStruct,
resultSetMetadata));
+ assertEquals(
+ Collections.singletonList(dataChangeRecord),
+ mapper.toChangeStreamRecords(partition, jsonFieldsStruct,
resultSetMetadata));
}
@Test
public void testMappingStructRowToHeartbeatRecord() {
final HeartbeatRecord heartbeatRecord =
new HeartbeatRecord(Timestamp.ofTimeSecondsAndNanos(10L, 20), null);
- final Struct struct = recordsToStruct(heartbeatRecord);
+ final Struct struct = recordsToStructWithStrings(heartbeatRecord);
assertEquals(
Collections.singletonList(heartbeatRecord),
@@ -180,7 +193,7 @@ public class ChangeStreamRecordMapperTest {
new ChildPartition("childToken1",
Sets.newHashSet("parentToken1", "parentToken2")),
new ChildPartition("childToken2",
Sets.newHashSet("parentToken1", "parentToken2"))),
null);
- final Struct struct = recordsToStruct(childPartitionsRecord);
+ final Struct struct = recordsToStructWithStrings(childPartitionsRecord);
assertEquals(
Collections.singletonList(childPartitionsRecord),
@@ -191,7 +204,7 @@ public class ChangeStreamRecordMapperTest {
@Test
public void testMappingStructRowFromInitialPartitionToChildPartitionRecord()
{
final Struct struct =
- recordsToStruct(
+ recordsToStructWithStrings(
new ChildPartitionsRecord(
Timestamp.ofTimeSecondsAndNanos(10L, 20),
"1",
@@ -217,7 +230,4 @@ public class ChangeStreamRecordMapperTest {
Collections.singletonList(expected),
mapper.toChangeStreamRecords(initialPartition, struct,
resultSetMetadata));
}
-
- // TODO: Add test case for unknown record type
- // TODO: Add test case for malformed record
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/util/TestStructMapper.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/util/TestStructMapper.java
index b699e0a..a3a434c 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/util/TestStructMapper.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/util/TestStructMapper.java
@@ -38,26 +38,53 @@ public class TestStructMapper {
Type.struct(
StructField.of("token", Type.string()),
StructField.of("parent_partition_tokens",
Type.array(Type.string())));
- private static final Type COLUMN_TYPE_TYPE =
+ // TODO: Remove COLUMN_TYPE_STRING_TYPE when backend has fully migrated to
use JSON
+ private static final Type COLUMN_TYPE_STRING_TYPE =
Type.struct(
StructField.of("name", Type.string()),
StructField.of("type", Type.string()),
StructField.of("is_primary_key", Type.bool()),
StructField.of("ordinal_position", Type.int64()));
- private static final Type MOD_TYPE =
+ private static final Type COLUMN_TYPE_JSON_TYPE =
+ Type.struct(
+ StructField.of("name", Type.string()),
+ StructField.of("type", Type.json()),
+ StructField.of("is_primary_key", Type.bool()),
+ StructField.of("ordinal_position", Type.int64()));
+ // TODO: Remove MOD_STRING_TYPE when backend has fully migrated to use JSON
+ private static final Type MOD_STRING_TYPE =
Type.struct(
StructField.of("keys", Type.string()),
StructField.of("new_values", Type.string()),
StructField.of("old_values", Type.string()));
- private static final Type DATA_CHANGE_RECORD_TYPE =
+ private static final Type MOD_JSON_TYPE =
+ Type.struct(
+ StructField.of("keys", Type.json()),
+ StructField.of("new_values", Type.json()),
+ StructField.of("old_values", Type.json()));
+ // TODO: Remove DATA_CHANGE_RECORD_STRING_TYPE when backend has fully
migrated to use JSON
+ private static final Type DATA_CHANGE_RECORD_STRING_TYPE =
+ Type.struct(
+ StructField.of("commit_timestamp", Type.timestamp()),
+ StructField.of("record_sequence", Type.string()),
+ StructField.of("server_transaction_id", Type.string()),
+ StructField.of("is_last_record_in_transaction_in_partition",
Type.bool()),
+ StructField.of("table_name", Type.string()),
+ StructField.of("column_types", Type.array(COLUMN_TYPE_STRING_TYPE)),
+ StructField.of("mods", Type.array(MOD_STRING_TYPE)),
+ StructField.of("mod_type", Type.string()),
+ StructField.of("value_capture_type", Type.string()),
+ StructField.of("number_of_records_in_transaction", Type.int64()),
+ StructField.of("number_of_partitions_in_transaction", Type.int64()));
+ private static final Type DATA_CHANGE_RECORD_JSON_TYPE =
Type.struct(
StructField.of("commit_timestamp", Type.timestamp()),
StructField.of("record_sequence", Type.string()),
StructField.of("server_transaction_id", Type.string()),
StructField.of("is_last_record_in_transaction_in_partition",
Type.bool()),
StructField.of("table_name", Type.string()),
- StructField.of("column_types", Type.array(COLUMN_TYPE_TYPE)),
- StructField.of("mods", Type.array(MOD_TYPE)),
+ StructField.of("column_types", Type.array(COLUMN_TYPE_JSON_TYPE)),
+ StructField.of("mods", Type.array(MOD_JSON_TYPE)),
StructField.of("mod_type", Type.string()),
StructField.of("value_capture_type", Type.string()),
StructField.of("number_of_records_in_transaction", Type.int64()),
@@ -69,39 +96,59 @@ public class TestStructMapper {
StructField.of("start_timestamp", Type.timestamp()),
StructField.of("record_sequence", Type.string()),
StructField.of("child_partitions",
Type.array(CHILD_PARTITION_TYPE)));
- private static final Type STREAM_RECORD_TYPE =
+ // TODO: Remove STREAM_RECORD_STRING_TYPE when backend has fully migrated to
use JSON
+ private static final Type STREAM_RECORD_STRING_TYPE =
Type.struct(
- StructField.of("data_change_record",
Type.array(DATA_CHANGE_RECORD_TYPE)),
+ StructField.of("data_change_record",
Type.array(DATA_CHANGE_RECORD_STRING_TYPE)),
StructField.of("heartbeat_record",
Type.array(HEARTBEAT_RECORD_TYPE)),
StructField.of("child_partitions_record",
Type.array(CHILD_PARTITIONS_RECORD_TYPE)));
+ private static final Type STREAM_RECORD_JSON_TYPE =
+ Type.struct(
+ StructField.of("data_change_record",
Type.array(DATA_CHANGE_RECORD_JSON_TYPE)),
+ StructField.of("heartbeat_record",
Type.array(HEARTBEAT_RECORD_TYPE)),
+ StructField.of("child_partitions_record",
Type.array(CHILD_PARTITIONS_RECORD_TYPE)));
+
+ public static Struct recordsToStructWithJson(ChangeStreamRecord... records) {
+ return recordsToStruct(true, records);
+ }
+
+ // TODO: Remove when backend is fully migrated to JSON
+ public static Struct recordsToStructWithStrings(ChangeStreamRecord...
records) {
+ return recordsToStruct(false, records);
+ }
- public static Struct recordsToStruct(ChangeStreamRecord... records) {
+ private static Struct recordsToStruct(boolean useJsonFields,
ChangeStreamRecord... records) {
+ final Type streamRecordType =
+ useJsonFields ? STREAM_RECORD_JSON_TYPE : STREAM_RECORD_STRING_TYPE;
return Struct.newBuilder()
.add(
Value.structArray(
- STREAM_RECORD_TYPE,
+ streamRecordType,
Arrays.stream(records)
- .map(TestStructMapper::streamRecordStructFrom)
+ .map(record ->
TestStructMapper.streamRecordStructFrom(record, useJsonFields))
.collect(Collectors.toList())))
.build();
}
- private static Struct streamRecordStructFrom(ChangeStreamRecord record) {
+ private static Struct streamRecordStructFrom(ChangeStreamRecord record,
boolean useJsonFields) {
if (record instanceof DataChangeRecord) {
- return streamRecordStructFrom((DataChangeRecord) record);
+ return streamRecordStructFrom((DataChangeRecord) record, useJsonFields);
} else if (record instanceof HeartbeatRecord) {
- return streamRecordStructFrom((HeartbeatRecord) record);
+ return streamRecordStructFrom((HeartbeatRecord) record, useJsonFields);
} else if (record instanceof ChildPartitionsRecord) {
- return streamRecordStructFrom((ChildPartitionsRecord) record);
+ return streamRecordStructFrom((ChildPartitionsRecord) record,
useJsonFields);
} else {
throw new UnsupportedOperationException("Unimplemented mapping for " +
record.getClass());
}
}
- private static Struct streamRecordStructFrom(ChildPartitionsRecord record) {
+ private static Struct streamRecordStructFrom(
+ ChildPartitionsRecord record, boolean useJsonFields) {
+ final Type dataChangeRecordType =
+ useJsonFields ? DATA_CHANGE_RECORD_JSON_TYPE :
DATA_CHANGE_RECORD_STRING_TYPE;
return Struct.newBuilder()
.set("data_change_record")
- .to(Value.structArray(DATA_CHANGE_RECORD_TYPE,
Collections.emptyList()))
+ .to(Value.structArray(dataChangeRecordType, Collections.emptyList()))
.set("heartbeat_record")
.to(Value.structArray(HEARTBEAT_RECORD_TYPE, Collections.emptyList()))
.set("child_partitions_record")
@@ -128,10 +175,12 @@ public class TestStructMapper {
.build();
}
- private static Struct streamRecordStructFrom(HeartbeatRecord record) {
+ private static Struct streamRecordStructFrom(HeartbeatRecord record, boolean
useJsonFields) {
+ final Type dataChangeRecordType =
+ useJsonFields ? DATA_CHANGE_RECORD_JSON_TYPE :
DATA_CHANGE_RECORD_STRING_TYPE;
return Struct.newBuilder()
.set("data_change_record")
- .to(Value.structArray(DATA_CHANGE_RECORD_TYPE,
Collections.emptyList()))
+ .to(Value.structArray(dataChangeRecordType, Collections.emptyList()))
.set("heartbeat_record")
.to(
Value.structArray(
@@ -145,12 +194,15 @@ public class TestStructMapper {
return
Struct.newBuilder().set("timestamp").to(record.getTimestamp()).build();
}
- private static Struct streamRecordStructFrom(DataChangeRecord record) {
+ private static Struct streamRecordStructFrom(DataChangeRecord record,
boolean useJsonFields) {
+ final Type dataChangeRecordType =
+ useJsonFields ? DATA_CHANGE_RECORD_JSON_TYPE :
DATA_CHANGE_RECORD_STRING_TYPE;
return Struct.newBuilder()
.set("data_change_record")
.to(
Value.structArray(
- DATA_CHANGE_RECORD_TYPE,
Collections.singletonList(recordStructFrom(record))))
+ dataChangeRecordType,
+ Collections.singletonList(recordStructFrom(record,
useJsonFields))))
.set("heartbeat_record")
.to(Value.structArray(HEARTBEAT_RECORD_TYPE, Collections.emptyList()))
.set("child_partitions_record")
@@ -158,18 +210,20 @@ public class TestStructMapper {
.build();
}
- private static Struct recordStructFrom(DataChangeRecord record) {
+ private static Struct recordStructFrom(DataChangeRecord record, boolean
useJsonFields) {
+ final Type columnTypeType = useJsonFields ? COLUMN_TYPE_JSON_TYPE :
COLUMN_TYPE_STRING_TYPE;
+ final Type modType = useJsonFields ? MOD_JSON_TYPE : MOD_STRING_TYPE;
final Value columnTypes =
Value.structArray(
- COLUMN_TYPE_TYPE,
+ columnTypeType,
record.getRowType().stream()
- .map(TestStructMapper::columnTypeStructFrom)
+ .map(rowType -> TestStructMapper.columnTypeStructFrom(rowType,
useJsonFields))
.collect(Collectors.toList()));
final Value mods =
Value.structArray(
- MOD_TYPE,
+ modType,
record.getMods().stream()
- .map(TestStructMapper::modStructFrom)
+ .map(mod -> TestStructMapper.modStructFrom(mod, useJsonFields))
.collect(Collectors.toList()));
return Struct.newBuilder()
.set("commit_timestamp")
@@ -197,12 +251,16 @@ public class TestStructMapper {
.build();
}
- private static Struct columnTypeStructFrom(ColumnType columnType) {
+ private static Struct columnTypeStructFrom(ColumnType columnType, boolean
useJsonFields) {
+ final Value type =
+ useJsonFields
+ ? Value.json(columnType.getType().getCode())
+ : Value.string(columnType.getType().getCode());
return Struct.newBuilder()
.set("name")
.to(columnType.getName())
.set("type")
- .to(columnType.getType().getCode())
+ .to(type)
.set("is_primary_key")
.to(columnType.isPrimaryKey())
.set("ordinal_position")
@@ -210,14 +268,20 @@ public class TestStructMapper {
.build();
}
- private static Struct modStructFrom(Mod mod) {
+ private static Struct modStructFrom(Mod mod, boolean useJsonFields) {
+ final Value keys =
+ useJsonFields ? Value.json(mod.getKeysJson()) :
Value.string(mod.getKeysJson());
+ final Value newValues =
+ useJsonFields ? Value.json(mod.getNewValuesJson()) :
Value.string(mod.getNewValuesJson());
+ final Value oldValues =
+ useJsonFields ? Value.json(mod.getOldValuesJson()) :
Value.string(mod.getOldValuesJson());
return Struct.newBuilder()
.set("keys")
- .to(mod.getKeysJson())
+ .to(keys)
.set("new_values")
- .to(mod.getNewValuesJson())
+ .to(newValues)
.set("old_values")
- .to(mod.getOldValuesJson())
+ .to(oldValues)
.build();
}