This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 39c40f2 Merge pull request #16203 from [BEAM-12164] Add Spanner
Change Stream Mappers
39c40f2 is described below
commit 39c40f294c75616da8622f81ff959fa475a6fd20
Author: Thiago Nunes <[email protected]>
AuthorDate: Fri Jan 7 16:29:58 2022 +1100
Merge pull request #16203 from [BEAM-12164] Add Spanner Change Stream
Mappers
* [BEAM-12164] Add Spanner Partition Metadata DAOs
* fix: remove metrics table from DAO
* fix: fix compilation error
* chore: fix linting violations
* feat: add opencensus dependency
* [BEAM-12164] Add Spanner Change Stream Mappers
The mapper classes convert from Cloud Spanner Structs to the change
stream models used by the connector. There are two mappers implemented:
1. For mapping to partition metadata models.
2. For mapping to change stream records (one of heartbeat, data or child
partitions).
* deps: update OpenCensus API to 0.30.0
---
.../mapper/ChangeStreamRecordMapper.java | 324 +++++++++++++++++++++
.../changestreams/mapper/MapperFactory.java | 67 +++++
.../mapper/PartitionMetadataMapper.java | 98 +++++++
.../spanner/changestreams/mapper/package-info.java | 26 ++
.../mapper/ChangeStreamRecordMapperTest.java | 223 ++++++++++++++
.../mapper/PartitionMetadataMapperTest.java | 142 +++++++++
.../changestreams/util/TestStructMapper.java | 232 +++++++++++++++
7 files changed, 1112 insertions(+)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java
new file mode 100644
index 0000000..db5b0ce
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper;
+
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.Struct;
+import java.util.HashSet;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamResultSetMetadata;
+import
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChangeStreamRecord;
+import
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChangeStreamRecordMetadata;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartition;
+import
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ColumnType;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.InitialPartition;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ModType;
+import
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.TypeCode;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ValueCaptureType;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+
+/**
+ * This class is responsible for transforming a {@link Struct} to a {@link
List} of {@link
+ * ChangeStreamRecord} models.
+ */
+public class ChangeStreamRecordMapper {
+
+ private static final String DATA_CHANGE_RECORD_COLUMN = "data_change_record";
+ private static final String HEARTBEAT_RECORD_COLUMN = "heartbeat_record";
+ private static final String CHILD_PARTITIONS_RECORD_COLUMN =
"child_partitions_record";
+
+ private static final String COMMIT_TIMESTAMP_COLUMN = "commit_timestamp";
+ private static final String SERVER_TRANSACTION_ID_COLUMN =
"server_transaction_id";
+ private static final String
IS_LAST_RECORD_IN_TRANSACTION_IN_PARTITION_COLUMN =
+ "is_last_record_in_transaction_in_partition";
+ private static final String RECORD_SEQUENCE_COLUMN = "record_sequence";
+ private static final String TABLE_NAME_COLUMN = "table_name";
+ private static final String COLUMN_TYPES_COLUMN = "column_types";
+ private static final String MODS_COLUMN = "mods";
+ private static final String MOD_TYPE_COLUMN = "mod_type";
+ private static final String VALUE_CAPTURE_TYPE_COLUMN = "value_capture_type";
+ private static final String NUMBER_OF_RECORDS_IN_TRANSACTION_COLUMN =
+ "number_of_records_in_transaction";
+ private static final String NUMBER_OF_PARTITIONS_IN_TRANSACTION_COLUMN =
+ "number_of_partitions_in_transaction";
+ private static final String NAME_COLUMN = "name";
+ private static final String TYPE_COLUMN = "type";
+ private static final String IS_PRIMARY_KEY_COLUMN = "is_primary_key";
+ private static final String ORDINAL_POSITION_COLUMN = "ordinal_position";
+ private static final String KEYS_COLUMN = "keys";
+ private static final String OLD_VALUES_COLUMN = "old_values";
+ private static final String NEW_VALUES_COLUMN = "new_values";
+
+ private static final String TIMESTAMP_COLUMN = "timestamp";
+
+ private static final String START_TIMESTAMP_COLUMN = "start_timestamp";
+ private static final String CHILD_PARTITIONS_COLUMN = "child_partitions";
+ private static final String PARENT_PARTITION_TOKENS_COLUMN =
"parent_partition_tokens";
+ private static final String TOKEN_COLUMN = "token";
+
+ ChangeStreamRecordMapper() {}
+
+ /**
+ * Transforms a {@link Struct} representing a change stream result into a
{@link List} of {@link
+ * ChangeStreamRecord} model. The type of the change stream record will be
identified and one of
+ * the following subclasses can be returned within the resulting {@link
List}:
+ *
+ * <ul>
+ * <li>{@link DataChangeRecord}
+ * <li>{@link HeartbeatRecord}
+ * <li>{@link ChildPartitionsRecord}
+ * </ul>
+ *
+ * Additionally to the {@link Struct} received, the originating partition of
the records (given by
+ * the {@link PartitionMetadata} parameter) and the stream metadata (given
by the {@link
+ * ChangeStreamResultSetMetadata}) are used to populate the {@link
ChangeStreamRecordMetadata} for
+ * each record mapped.
+ *
+ * <p>The {@link Struct} is expected to have the following fields:
+ *
+ * <ul>
+ * <li>{@link ChangeStreamRecordMapper#DATA_CHANGE_RECORD_COLUMN}:
non-nullable {@link Struct}
+ * list of data change records.
+ * <ul>
+ * <li>{@link ChangeStreamRecordMapper#COMMIT_TIMESTAMP_COLUMN}:
non-nullable {@link
+ * Timestamp} representing the timestamp at which the
modifications within the record
+ * were committed in Cloud Spanner.
+ * <li>{@link
ChangeStreamRecordMapper#SERVER_TRANSACTION_ID_COLUMN}: non-nullable {@link
+ * String} representing the unique transaction id in which the
modifications for this
+ * record occurred.
+ * <li>{@link
ChangeStreamRecordMapper#IS_LAST_RECORD_IN_TRANSACTION_IN_PARTITION_COLUMN}:
+ * non-nullable {@link Boolean} indicating whether this record
is the last emitted for
+ * the transaction.
+ * <li>{@link ChangeStreamRecordMapper#RECORD_SEQUENCE_COLUMN}:
non-nullable {@link
+ * String} representing the order in which this record appears
within the context of a
+ * partition, commit timestamp and transaction.
+ * <li>{@link ChangeStreamRecordMapper#TABLE_NAME_COLUMN}:
non-nullable {@link String}
+ * representing the name of the table in which the modifications
for this record
+ * occurred.
+ * <li>{@link ChangeStreamRecordMapper#COLUMN_TYPES_COLUMN}:
non-nullable {@link List} of
+ * {@link Struct}s representing the type of the primary keys and
modified columns
+ * within this record.
+ * <ul>
+ * <li>{@link ChangeStreamRecordMapper#NAME_COLUMN}:
non-nullable {@link String}
+ * representing the name of a column.
+ * <li>{@link ChangeStreamRecordMapper#TYPE_COLUMN}:
non-nullable {@link String}
+ * representing the type of a column.
+ * <li>{@link ChangeStreamRecordMapper#IS_PRIMARY_KEY_COLUMN}:
non-nullable {@link
+ * Boolean} indicating if the column is part of the
primary key.
+ * <li>{@link
ChangeStreamRecordMapper#ORDINAL_POSITION_COLUMN}: non-nullable {@link
+ * Long} representing the position of the column in the
table it is defined.
+ * </ul>
+ * <li>{@link ChangeStreamRecordMapper#MODS_COLUMN}: non-nullable
{@link List} of {@link
+ * Struct}s representing the data modifications within this
record.
+ * <ul>
+ * <li>{@link ChangeStreamRecordMapper#KEYS_COLUMN}:
non-nullable {@link String}
+ * json object, where keys are the primary key column
names, and the values are
+ * their corresponding values.
+ * <li>{@link ChangeStreamRecordMapper#OLD_VALUES_COLUMN}:
nullable {@link String}
+ * json object displaying the old state of the columns
modified, where keys are
+ * the column names, and the values are their
corresponding values.
+ * <li>{@link ChangeStreamRecordMapper#NEW_VALUES_COLUMN}:
nullable {@link String}
+ * json object displaying the new state of the columns
modified, where keys are
+ * the column names, and the values are their
corresponding values.
+ * </ul>
+ * <li>{@link ChangeStreamRecordMapper#MOD_TYPE_COLUMN}:
non-nullable {@link String}
+ * representing the type of operation that caused the
modifications (see also {@link
+ * ModType}.
+ * <li>{@link ChangeStreamRecordMapper#VALUE_CAPTURE_TYPE_COLUMN}:
non-nullable {@link
+ * String} representing the capture type of the change stream
that generated this
+ * record (see also {@link ValueCaptureType}).
+ * <li>{@link
ChangeStreamRecordMapper#NUMBER_OF_RECORDS_IN_TRANSACTION_COLUMN}:
+ * non-nullable {@link Long} representing the total number of
data change records for
+ * the transaction in which this record occurred.
+ * <li>{@link
ChangeStreamRecordMapper#NUMBER_OF_PARTITIONS_IN_TRANSACTION_COLUMN}:
+ * non-nullable {@link Long} representing the total number of
partitions for the
+ * transaction in which this record occurred.
+ * </ul>
+ * <li>{@link ChangeStreamRecordMapper#HEARTBEAT_RECORD_COLUMN}:
non-nullable {@link Struct}
+ * list of hearbeat records.
+ * <ul>
+ * <li>{@link ChangeStreamRecordMapper#TIMESTAMP_COLUMN}:
non-nullable {@link Timestamp}
+ * representing the timestamp for which the change stream query
has returned all
+ * changes (see more in {@link HeartbeatRecord#getTimestamp()}.
+ * </ul>
+ * <li>{@link ChangeStreamRecordMapper#CHILD_PARTITIONS_RECORD_COLUMN}:
non-nullable {@link
+ * Struct} list of child partitions records.
+ * <ul>
+ * <li>{@link ChangeStreamRecordMapper#START_TIMESTAMP_COLUMN}:
non-nullable {@link
+ * Timestamp} representing the timestamp at which the new
partition started being
+ * valid in Cloud Spanner.
+ * <li>{@link ChangeStreamRecordMapper#RECORD_SEQUENCE_COLUMN}:
non-nullable {@link
+ * String} representing the order in which this record appears
within the context of a
+ * partition and commit timestamp.
+ * <li>{@link ChangeStreamRecordMapper#CHILD_PARTITIONS_COLUMN}:
non-nullable {@link List}
+ * of {@link Struct} representing the new child partitions.
+ * <ul>
+ * <li>{@link ChangeStreamRecordMapper#TOKEN_COLUMN}:
non-nullable {@link String}
+ * representing the unique identifier of the new child
partition.
+ * <li>{@link
ChangeStreamRecordMapper#PARENT_PARTITION_TOKENS_COLUMN}: non-nullable
+ * {@link List} of {@link String} representing the unique
identifier(s) of
+ * parent partition(s) where this child partition
originated from.
+ * </ul>
+ * </ul>
+ * </ul>
+ *
+ * @param partition the partition metadata from which the row was generated
+ * @param row the struct row, representing a single change stream result (it
may contain multiple
+ * change stream records within)
+ * @param resultSetMetadata the metadata generated when reading the change
stream row
+ * @return a {@link List} of {@link ChangeStreamRecord} subclasses
+ */
+ public List<ChangeStreamRecord> toChangeStreamRecords(
+ PartitionMetadata partition, Struct row, ChangeStreamResultSetMetadata
resultSetMetadata) {
+ return row.getStructList(0).stream()
+ .flatMap(struct -> toChangeStreamRecord(partition, struct,
resultSetMetadata))
+ .collect(Collectors.toList());
+ }
+
+ private Stream<ChangeStreamRecord> toChangeStreamRecord(
+ PartitionMetadata partition, Struct row, ChangeStreamResultSetMetadata
resultSetMetadata) {
+
+ final Stream<DataChangeRecord> dataChangeRecords =
+ row.getStructList(DATA_CHANGE_RECORD_COLUMN).stream()
+ .filter(this::isNonNullDataChangeRecord)
+ .map(struct -> toDataChangeRecord(partition, struct,
resultSetMetadata));
+
+ final Stream<HeartbeatRecord> heartbeatRecords =
+ row.getStructList(HEARTBEAT_RECORD_COLUMN).stream()
+ .filter(this::isNonNullHeartbeatRecord)
+ .map(struct -> toHeartbeatRecord(partition, struct,
resultSetMetadata));
+
+ final Stream<ChildPartitionsRecord> childPartitionsRecords =
+ row.getStructList(CHILD_PARTITIONS_RECORD_COLUMN).stream()
+ .filter(this::isNonNullChildPartitionsRecord)
+ .map(struct -> toChildPartitionsRecord(partition, struct,
resultSetMetadata));
+
+ return Stream.concat(
+ Stream.concat(dataChangeRecords, heartbeatRecords),
childPartitionsRecords);
+ }
+
+ private boolean isNonNullDataChangeRecord(Struct row) {
+ return !row.isNull(COMMIT_TIMESTAMP_COLUMN);
+ }
+
+ private boolean isNonNullHeartbeatRecord(Struct row) {
+ return !row.isNull(TIMESTAMP_COLUMN);
+ }
+
+ private boolean isNonNullChildPartitionsRecord(Struct row) {
+ return !row.isNull(START_TIMESTAMP_COLUMN);
+ }
+
+ private DataChangeRecord toDataChangeRecord(
+ PartitionMetadata partition, Struct row, ChangeStreamResultSetMetadata
resultSetMetadata) {
+ final Timestamp commitTimestamp =
row.getTimestamp(COMMIT_TIMESTAMP_COLUMN);
+ return new DataChangeRecord(
+ partition.getPartitionToken(),
+ commitTimestamp,
+ row.getString(SERVER_TRANSACTION_ID_COLUMN),
+ row.getBoolean(IS_LAST_RECORD_IN_TRANSACTION_IN_PARTITION_COLUMN),
+ row.getString(RECORD_SEQUENCE_COLUMN),
+ row.getString(TABLE_NAME_COLUMN),
+ row.getStructList(COLUMN_TYPES_COLUMN).stream()
+ .map(this::columnTypeFrom)
+ .collect(Collectors.toList()),
+
row.getStructList(MODS_COLUMN).stream().map(this::modFrom).collect(Collectors.toList()),
+ ModType.valueOf(row.getString(MOD_TYPE_COLUMN)),
+ ValueCaptureType.valueOf(row.getString(VALUE_CAPTURE_TYPE_COLUMN)),
+ row.getLong(NUMBER_OF_RECORDS_IN_TRANSACTION_COLUMN),
+ row.getLong(NUMBER_OF_PARTITIONS_IN_TRANSACTION_COLUMN),
+ changeStreamRecordMetadataFrom(partition, commitTimestamp,
resultSetMetadata));
+ }
+
+ private HeartbeatRecord toHeartbeatRecord(
+ PartitionMetadata partition, Struct row, ChangeStreamResultSetMetadata
resultSetMetadata) {
+ final Timestamp timestamp = row.getTimestamp(TIMESTAMP_COLUMN);
+
+ return new HeartbeatRecord(
+ timestamp, changeStreamRecordMetadataFrom(partition, timestamp,
resultSetMetadata));
+ }
+
+ private ChildPartitionsRecord toChildPartitionsRecord(
+ PartitionMetadata partition, Struct row, ChangeStreamResultSetMetadata
resultSetMetadata) {
+ final Timestamp startTimestamp = row.getTimestamp(START_TIMESTAMP_COLUMN);
+
+ return new ChildPartitionsRecord(
+ startTimestamp,
+ row.getString(RECORD_SEQUENCE_COLUMN),
+ row.getStructList(CHILD_PARTITIONS_COLUMN).stream()
+ .map(struct -> childPartitionFrom(partition.getPartitionToken(),
struct))
+ .collect(Collectors.toList()),
+ changeStreamRecordMetadataFrom(partition, startTimestamp,
resultSetMetadata));
+ }
+
+ private ColumnType columnTypeFrom(Struct struct) {
+ return new ColumnType(
+ struct.getString(NAME_COLUMN),
+ new TypeCode(struct.getString(TYPE_COLUMN)),
+ struct.getBoolean(IS_PRIMARY_KEY_COLUMN),
+ struct.getLong(ORDINAL_POSITION_COLUMN));
+ }
+
+ private Mod modFrom(Struct struct) {
+ final String keysJson = struct.getString(KEYS_COLUMN);
+ final String oldValuesJson =
+ struct.isNull(OLD_VALUES_COLUMN) ? null :
struct.getString(OLD_VALUES_COLUMN);
+ final String newValuesJson =
+ struct.isNull(NEW_VALUES_COLUMN)
+ ? null
+ : struct.getString(ChangeStreamRecordMapper.NEW_VALUES_COLUMN);
+ return new Mod(keysJson, oldValuesJson, newValuesJson);
+ }
+
+ private ChildPartition childPartitionFrom(String partitionToken, Struct
struct) {
+ final HashSet<String> parentTokens =
+ Sets.newHashSet(struct.getStringList(PARENT_PARTITION_TOKENS_COLUMN));
+ if (InitialPartition.isInitialPartition(partitionToken)) {
+ parentTokens.add(partitionToken);
+ }
+ return new ChildPartition(struct.getString(TOKEN_COLUMN), parentTokens);
+ }
+
+ private ChangeStreamRecordMetadata changeStreamRecordMetadataFrom(
+ PartitionMetadata partition,
+ Timestamp recordTimestamp,
+ ChangeStreamResultSetMetadata resultSetMetadata) {
+ return ChangeStreamRecordMetadata.newBuilder()
+ .withRecordTimestamp(recordTimestamp)
+ .withPartitionToken(partition.getPartitionToken())
+ .withPartitionStartTimestamp(partition.getStartTimestamp())
+ .withPartitionEndTimestamp(partition.getEndTimestamp())
+ .withPartitionCreatedAt(partition.getCreatedAt())
+ .withPartitionScheduledAt(partition.getScheduledAt())
+ .withPartitionRunningAt(partition.getRunningAt())
+ .withQueryStartedAt(resultSetMetadata.getQueryStartedAt())
+
.withRecordStreamStartedAt(resultSetMetadata.getRecordStreamStartedAt())
+ .withRecordStreamEndedAt(resultSetMetadata.getRecordStreamEndedAt())
+ .withRecordReadAt(resultSetMetadata.getRecordReadAt())
+
.withTotalStreamTimeMillis(resultSetMetadata.getTotalStreamDuration().getMillis())
+ .withNumberOfRecordsRead(resultSetMetadata.getNumberOfRecordsRead())
+ .build();
+ }
+}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/MapperFactory.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/MapperFactory.java
new file mode 100644
index 0000000..5227463
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/MapperFactory.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper;
+
+import java.io.Serializable;
+
+/**
+ * Factory class for creating instances that will map a struct to a connector
model. The instances
+ * created are all singletons.
+ */
+// static fields are un-initialized, because we start them during the first
fetch call (with the
+// singleton pattern)
+@SuppressWarnings("initialization.static.fields.uninitialized")
+public class MapperFactory implements Serializable {
+
+ private static final long serialVersionUID = -813434573067800902L;
+
+ private static ChangeStreamRecordMapper changeStreamRecordMapperInstance;
+ private static PartitionMetadataMapper partitionMetadataMapperInstance;
+
+ /**
+ * Creates and returns a singleton instance of a mapper class capable of
transforming a {@link
+ * com.google.cloud.spanner.Struct} into a {@link java.util.List} of {@link
+ *
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChangeStreamRecord}
subclasses.
+ *
+ * <p>This method is thread safe.
+ *
+ * @return singleton instance of the {@link ChangeStreamRecordMapper}
+ */
+ public synchronized ChangeStreamRecordMapper changeStreamRecordMapper() {
+ if (changeStreamRecordMapperInstance == null) {
+ changeStreamRecordMapperInstance = new ChangeStreamRecordMapper();
+ }
+ return changeStreamRecordMapperInstance;
+ }
+
+ /**
+ * Creates and returns a single instance of a mapper class capable of
transforming a {@link
+ * com.google.cloud.spanner.Struct} into a {@link
+ * org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata}
class.
+ *
+ * <p>This method is thread safe.
+ *
+ * @return singleton instance of the {@link PartitionMetadataMapper}
+ */
+ public synchronized PartitionMetadataMapper partitionMetadataMapper() {
+ if (partitionMetadataMapperInstance == null) {
+ partitionMetadataMapperInstance = new PartitionMetadataMapper();
+ }
+ return partitionMetadataMapperInstance;
+ }
+}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/PartitionMetadataMapper.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/PartitionMetadataMapper.java
new file mode 100644
index 0000000..1e0ece1
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/PartitionMetadataMapper.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper;
+
+import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_CREATED_AT;
+import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_END_TIMESTAMP;
+import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_FINISHED_AT;
+import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_HEARTBEAT_MILLIS;
+import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_PARENT_TOKENS;
+import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_PARTITION_TOKEN;
+import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_RUNNING_AT;
+import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_SCHEDULED_AT;
+import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_START_TIMESTAMP;
+import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_STATE;
+import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_WATERMARK;
+
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.Struct;
+import java.util.List;
+import
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao;
+import
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+
+/** This class is responsible for transforming a {@link Struct} to a {@link
PartitionMetadata}. */
+public class PartitionMetadataMapper {
+
+ PartitionMetadataMapper() {}
+
+ /**
+ * Transforms a {@link Struct} representing a partition metadata row into a
{@link
+ * PartitionMetadata} model. The {@link Struct} is expected to have the
following fields:
+ *
+ * <ul>
+ * <li>{@link PartitionMetadataAdminDao#COLUMN_PARTITION_TOKEN}:
non-nullable {@link String}
+ * representing the partition unique identifier.
+ * <li>{@link PartitionMetadataAdminDao#COLUMN_PARENT_TOKENS}:
non-nullable {@link List} of
+ * {@link String} representing the partition parents' unique
identifiers.
+ * <li>{@link PartitionMetadataAdminDao#COLUMN_START_TIMESTAMP}:
non-nullable {@link Timestamp}
+ * representing the start time at which the partition started existing
in Cloud Spanner.
+ * <li>{@link PartitionMetadataAdminDao#COLUMN_END_TIMESTAMP}: nullable
{@link Timestamp}
+ * representing the end time for querying this partition.
+ * <li>{@link PartitionMetadataAdminDao#COLUMN_HEARTBEAT_MILLIS}:
non-nullable {@link Long}
+ * representing the number of milliseconds after the stream is idle,
which a heartbeat
+ * record will be emitted.
+ * <li>{@link PartitionMetadataAdminDao#COLUMN_STATE}: non-nullable {@link
String} representing
+ * the {@link State} in which the partition is in.
+ * <li>{@link PartitionMetadataAdminDao#COLUMN_WATERMARK}: non-nullable
{@link Timestamp}
+ * representing the time for which all records with a timestamp less
than it have been
+ * processed.
+ * <li>{@link PartitionMetadataAdminDao#COLUMN_CREATED_AT}: non-nullable
{@link Timestamp}
+ * representing the time at which this partition was first detected.
+ * <li>{@link PartitionMetadataAdminDao#COLUMN_SCHEDULED_AT}: nullable
{@link Timestamp}
+ * representing the time at which this partition was scheduled to be
queried.
+ * <li>{@link PartitionMetadataAdminDao#COLUMN_RUNNING_AT}: nullable
{@link Timestamp}
+ * representing the time at which the connector started processing
this partition.
+ * <li>{@link PartitionMetadataAdminDao#COLUMN_FINISHED_AT}: nullable
{@link Timestamp}
+ * representing the time at which the connector finished processing
this partition.
+ * </ul>
+ *
+ * @param row the {@link Struct} row to be converted. It should contain all
the fields as
+ * specified above.
+ * @return a {@link PartitionMetadata} with the mapped {@link Struct} field
values.
+ */
+ public PartitionMetadata from(Struct row) {
+ return PartitionMetadata.newBuilder()
+ .setPartitionToken(row.getString(COLUMN_PARTITION_TOKEN))
+
.setParentTokens(Sets.newHashSet(row.getStringList(COLUMN_PARENT_TOKENS)))
+ .setStartTimestamp(row.getTimestamp(COLUMN_START_TIMESTAMP))
+ .setEndTimestamp(
+ !row.isNull(COLUMN_END_TIMESTAMP) ?
row.getTimestamp(COLUMN_END_TIMESTAMP) : null)
+ .setHeartbeatMillis(row.getLong(COLUMN_HEARTBEAT_MILLIS))
+ .setState(State.valueOf(row.getString(COLUMN_STATE)))
+ .setWatermark(row.getTimestamp(COLUMN_WATERMARK))
+ .setCreatedAt(row.getTimestamp(COLUMN_CREATED_AT))
+ .setScheduledAt(
+ !row.isNull(COLUMN_SCHEDULED_AT) ?
row.getTimestamp(COLUMN_SCHEDULED_AT) : null)
+ .setRunningAt(!row.isNull(COLUMN_RUNNING_AT) ?
row.getTimestamp(COLUMN_RUNNING_AT) : null)
+ .setFinishedAt(
+ !row.isNull(COLUMN_FINISHED_AT) ?
row.getTimestamp(COLUMN_FINISHED_AT) : null)
+ .build();
+ }
+}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/package-info.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/package-info.java
new file mode 100644
index 0000000..7f918fd
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/package-info.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Mapping related functionality, such as from {@link
com.google.cloud.spanner.ResultSet}s to Change
+ * Stream models.
+ */
+@Experimental
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper;
+
+import org.apache.beam.sdk.annotations.Experimental;
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java
new file mode 100644
index 0000000..26609f6
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper;
+
+import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestStructMapper.recordsToStruct;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.Struct;
+import java.util.Arrays;
+import java.util.Collections;
+import
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamResultSetMetadata;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartition;
+import
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ColumnType;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.InitialPartition;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ModType;
+import
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.TypeCode;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ValueCaptureType;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.joda.time.Duration;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ChangeStreamRecordMapperTest {
+
+ private ChangeStreamRecordMapper mapper;
+ private PartitionMetadata partition;
+ private ChangeStreamResultSetMetadata resultSetMetadata;
+
+ @Before
+ public void setUp() {
+ mapper = new ChangeStreamRecordMapper();
+ partition =
+ PartitionMetadata.newBuilder()
+ .setPartitionToken("partitionToken")
+ .setParentTokens(Sets.newHashSet("parentToken"))
+ .setHeartbeatMillis(30_000L)
+ .setState(State.RUNNING)
+ .setWatermark(Timestamp.ofTimeMicroseconds(10L))
+ .setStartTimestamp(Timestamp.ofTimeMicroseconds(11L))
+ .setEndTimestamp(Timestamp.ofTimeMicroseconds(12L))
+ .setCreatedAt(Timestamp.ofTimeMicroseconds(13L))
+ .setScheduledAt(Timestamp.ofTimeMicroseconds(14L))
+ .setRunningAt(Timestamp.ofTimeMicroseconds(15L))
+ .build();
+ resultSetMetadata = mock(ChangeStreamResultSetMetadata.class);
+
when(resultSetMetadata.getQueryStartedAt()).thenReturn(Timestamp.ofTimeMicroseconds(1L));
+
when(resultSetMetadata.getRecordStreamStartedAt()).thenReturn(Timestamp.ofTimeMicroseconds(2L));
+
when(resultSetMetadata.getRecordStreamEndedAt()).thenReturn(Timestamp.ofTimeMicroseconds(3L));
+
when(resultSetMetadata.getRecordReadAt()).thenReturn(Timestamp.ofTimeMicroseconds(4L));
+
when(resultSetMetadata.getTotalStreamDuration()).thenReturn(Duration.millis(100));
+ when(resultSetMetadata.getNumberOfRecordsRead()).thenReturn(10_000L);
+ }
+
+ @Test
+ public void testMappingUpdateStructRowToDataChangeRecord() {
+ final DataChangeRecord dataChangeRecord =
+ new DataChangeRecord(
+ "partitionToken",
+ Timestamp.ofTimeSecondsAndNanos(10L, 20),
+ "serverTransactionId",
+ true,
+ "1",
+ "tableName",
+ Arrays.asList(
+ new ColumnType("column1", new TypeCode("type1"), true, 1L),
+ new ColumnType("column2", new TypeCode("type2"), false, 2L)),
+ Collections.singletonList(
+ new Mod(
+ "{\"column1\": \"value1\"}",
+ "{\"column2\": \"oldValue2\"}",
+ "{\"column2\": \"newValue2\"}")),
+ ModType.UPDATE,
+ ValueCaptureType.OLD_AND_NEW_VALUES,
+ 10L,
+ 2L,
+ null);
+ final Struct struct = recordsToStruct(dataChangeRecord);
+
+ assertEquals(
+ Collections.singletonList(dataChangeRecord),
+ mapper.toChangeStreamRecords(partition, struct, resultSetMetadata));
+ }
+
+ @Test
+ public void testMappingInsertStructRowToDataChangeRecord() {
+ final DataChangeRecord dataChangeRecord =
+ new DataChangeRecord(
+ "partitionToken",
+ Timestamp.ofTimeSecondsAndNanos(10L, 20),
+ "transactionId",
+ false,
+ "1",
+ "tableName",
+ Arrays.asList(
+ new ColumnType("column1", new TypeCode("type1"), true, 1L),
+ new ColumnType("column2", new TypeCode("type2"), false, 2L)),
+ Collections.singletonList(
+ new Mod("{\"column1\": \"value1\"}", null, "{\"column2\":
\"newValue2\"}")),
+ ModType.INSERT,
+ ValueCaptureType.OLD_AND_NEW_VALUES,
+ 10L,
+ 2L,
+ null);
+ final Struct struct = recordsToStruct(dataChangeRecord);
+
+ assertEquals(
+ Collections.singletonList(dataChangeRecord),
+ mapper.toChangeStreamRecords(partition, struct, resultSetMetadata));
+ }
+
+ @Test
+ public void testMappingDeleteStructRowToDataChangeRecord() {
+ final DataChangeRecord dataChangeRecord =
+ new DataChangeRecord(
+ "partitionToken",
+ Timestamp.ofTimeSecondsAndNanos(10L, 20),
+ "transactionId",
+ false,
+ "1",
+ "tableName",
+ Arrays.asList(
+ new ColumnType("column1", new TypeCode("type1"), true, 1L),
+ new ColumnType("column2", new TypeCode("type2"), false, 2L)),
+ Collections.singletonList(
+ new Mod("{\"column1\": \"value1\"}", "{\"column2\":
\"oldValue2\"}", null)),
+ ModType.DELETE,
+ ValueCaptureType.OLD_AND_NEW_VALUES,
+ 10L,
+ 2L,
+ null);
+ final Struct struct = recordsToStruct(dataChangeRecord);
+
+ assertEquals(
+ Collections.singletonList(dataChangeRecord),
+ mapper.toChangeStreamRecords(partition, struct, resultSetMetadata));
+ }
+
+ @Test
+ public void testMappingStructRowToHeartbeatRecord() {
+ final HeartbeatRecord heartbeatRecord =
+ new HeartbeatRecord(Timestamp.ofTimeSecondsAndNanos(10L, 20), null);
+ final Struct struct = recordsToStruct(heartbeatRecord);
+
+ assertEquals(
+ Collections.singletonList(heartbeatRecord),
+ mapper.toChangeStreamRecords(partition, struct, resultSetMetadata));
+ }
+
+ @Test
+ public void testMappingStructRowToChildPartitionRecord() {
+ final ChildPartitionsRecord childPartitionsRecord =
+ new ChildPartitionsRecord(
+ Timestamp.ofTimeSecondsAndNanos(10L, 20),
+ "1",
+ Arrays.asList(
+ new ChildPartition("childToken1",
Sets.newHashSet("parentToken1", "parentToken2")),
+ new ChildPartition("childToken2",
Sets.newHashSet("parentToken1", "parentToken2"))),
+ null);
+ final Struct struct = recordsToStruct(childPartitionsRecord);
+
+ assertEquals(
+ Collections.singletonList(childPartitionsRecord),
+ mapper.toChangeStreamRecords(partition, struct, resultSetMetadata));
+ }
+
+ /** Adds the default parent partition token as a parent of each child
partition. */
+ @Test
+ public void testMappingStructRowFromInitialPartitionToChildPartitionRecord()
{
+ final Struct struct =
+ recordsToStruct(
+ new ChildPartitionsRecord(
+ Timestamp.ofTimeSecondsAndNanos(10L, 20),
+ "1",
+ Arrays.asList(
+ new ChildPartition("childToken1", Sets.newHashSet()),
+ new ChildPartition("childToken2", Sets.newHashSet())),
+ null));
+ final ChildPartitionsRecord expected =
+ new ChildPartitionsRecord(
+ Timestamp.ofTimeSecondsAndNanos(10L, 20),
+ "1",
+ Arrays.asList(
+ new ChildPartition(
+ "childToken1",
Sets.newHashSet(InitialPartition.PARTITION_TOKEN)),
+ new ChildPartition(
+ "childToken2",
Sets.newHashSet(InitialPartition.PARTITION_TOKEN))),
+ null);
+
+ final PartitionMetadata initialPartition =
+
partition.toBuilder().setPartitionToken(InitialPartition.PARTITION_TOKEN).build();
+
+ assertEquals(
+ Collections.singletonList(expected),
+ mapper.toChangeStreamRecords(initialPartition, struct,
resultSetMetadata));
+ }
+
+ // TODO: Add test case for unknown record type
+ // TODO: Add test case for malformed record
+}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/PartitionMetadataMapperTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/PartitionMetadataMapperTest.java
new file mode 100644
index 0000000..b95dd52
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/PartitionMetadataMapperTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper;
+
+import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_CREATED_AT;
+import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_END_TIMESTAMP;
+import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_FINISHED_AT;
+import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_HEARTBEAT_MILLIS;
+import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_PARENT_TOKENS;
+import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_PARTITION_TOKEN;
+import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_RUNNING_AT;
+import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_SCHEDULED_AT;
+import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_START_TIMESTAMP;
+import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_STATE;
+import static
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_WATERMARK;
+import static org.junit.Assert.assertEquals;
+
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.Struct;
+import java.util.Collections;
+import
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PartitionMetadataMapperTest {
+
+ private PartitionMetadataMapper mapper;
+
+ @Before
+ public void setUp() {
+ mapper = new PartitionMetadataMapper();
+ }
+
+ @Test
+ public void testMapPartitionMetadataFromResultSet() {
+ final Struct row =
+ Struct.newBuilder()
+ .set(COLUMN_PARTITION_TOKEN)
+ .to("token")
+ .set(COLUMN_PARENT_TOKENS)
+ .toStringArray(Collections.singletonList("parentToken"))
+ .set(COLUMN_START_TIMESTAMP)
+ .to(Timestamp.ofTimeMicroseconds(10L))
+ .set(COLUMN_END_TIMESTAMP)
+ .to(Timestamp.ofTimeMicroseconds(20L))
+ .set(COLUMN_HEARTBEAT_MILLIS)
+ .to(5_000L)
+ .set(COLUMN_STATE)
+ .to(State.RUNNING.name())
+ .set(COLUMN_WATERMARK)
+ .to(Timestamp.ofTimeMicroseconds(30L))
+ .set(COLUMN_CREATED_AT)
+ .to(Timestamp.ofTimeMicroseconds(40L))
+ .set(COLUMN_SCHEDULED_AT)
+ .to(Timestamp.ofTimeMicroseconds(50L))
+ .set(COLUMN_RUNNING_AT)
+ .to(Timestamp.ofTimeMicroseconds(60L))
+ .set(COLUMN_FINISHED_AT)
+ .to(Timestamp.ofTimeMicroseconds(70L))
+ .build();
+
+ final PartitionMetadata partition = mapper.from(row);
+
+ assertEquals(
+ new PartitionMetadata(
+ "token",
+ Sets.newHashSet("parentToken"),
+ Timestamp.ofTimeMicroseconds(10L),
+ Timestamp.ofTimeMicroseconds(20L),
+ 5_000L,
+ State.RUNNING,
+ Timestamp.ofTimeMicroseconds(30),
+ Timestamp.ofTimeMicroseconds(40),
+ Timestamp.ofTimeMicroseconds(50),
+ Timestamp.ofTimeMicroseconds(60),
+ Timestamp.ofTimeMicroseconds(70)),
+ partition);
+ }
+
+ @Test
+ public void testMapPartitionMetadataFromResultSetWithNulls() {
+ final Struct row =
+ Struct.newBuilder()
+ .set(COLUMN_PARTITION_TOKEN)
+ .to("token")
+ .set(COLUMN_PARENT_TOKENS)
+ .toStringArray(Collections.singletonList("parentToken"))
+ .set(COLUMN_START_TIMESTAMP)
+ .to(Timestamp.ofTimeMicroseconds(10L))
+ .set(COLUMN_END_TIMESTAMP)
+ .to((Timestamp) null)
+ .set(COLUMN_HEARTBEAT_MILLIS)
+ .to(5_000L)
+ .set(COLUMN_STATE)
+ .to(State.CREATED.name())
+ .set(COLUMN_WATERMARK)
+ .to(Timestamp.ofTimeMicroseconds(30L))
+ .set(COLUMN_CREATED_AT)
+ .to(Timestamp.ofTimeMicroseconds(40L))
+ .set(COLUMN_SCHEDULED_AT)
+ .to((Timestamp) null)
+ .set(COLUMN_RUNNING_AT)
+ .to((Timestamp) null)
+ .set(COLUMN_FINISHED_AT)
+ .to((Timestamp) null)
+ .build();
+
+ final PartitionMetadata partition = mapper.from(row);
+
+ assertEquals(
+ new PartitionMetadata(
+ "token",
+ Sets.newHashSet("parentToken"),
+ Timestamp.ofTimeMicroseconds(10L),
+ null,
+ 5_000L,
+ State.CREATED,
+ Timestamp.ofTimeMicroseconds(30),
+ Timestamp.ofTimeMicroseconds(40),
+ null,
+ null,
+ null),
+ partition);
+ }
+}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/util/TestStructMapper.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/util/TestStructMapper.java
new file mode 100644
index 0000000..b699e0a
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/util/TestStructMapper.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.util;
+
+import com.google.cloud.spanner.Struct;
+import com.google.cloud.spanner.Type;
+import com.google.cloud.spanner.Type.StructField;
+import com.google.cloud.spanner.Value;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.stream.Collectors;
+import
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChangeStreamRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartition;
+import
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ColumnType;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
+
+public class TestStructMapper {
+
+ private static final Type CHILD_PARTITION_TYPE =
+ Type.struct(
+ StructField.of("token", Type.string()),
+ StructField.of("parent_partition_tokens",
Type.array(Type.string())));
+ private static final Type COLUMN_TYPE_TYPE =
+ Type.struct(
+ StructField.of("name", Type.string()),
+ StructField.of("type", Type.string()),
+ StructField.of("is_primary_key", Type.bool()),
+ StructField.of("ordinal_position", Type.int64()));
+ private static final Type MOD_TYPE =
+ Type.struct(
+ StructField.of("keys", Type.string()),
+ StructField.of("new_values", Type.string()),
+ StructField.of("old_values", Type.string()));
+ private static final Type DATA_CHANGE_RECORD_TYPE =
+ Type.struct(
+ StructField.of("commit_timestamp", Type.timestamp()),
+ StructField.of("record_sequence", Type.string()),
+ StructField.of("server_transaction_id", Type.string()),
+ StructField.of("is_last_record_in_transaction_in_partition",
Type.bool()),
+ StructField.of("table_name", Type.string()),
+ StructField.of("column_types", Type.array(COLUMN_TYPE_TYPE)),
+ StructField.of("mods", Type.array(MOD_TYPE)),
+ StructField.of("mod_type", Type.string()),
+ StructField.of("value_capture_type", Type.string()),
+ StructField.of("number_of_records_in_transaction", Type.int64()),
+ StructField.of("number_of_partitions_in_transaction", Type.int64()));
+ private static final Type HEARTBEAT_RECORD_TYPE =
+ Type.struct(StructField.of("timestamp", Type.timestamp()));
+ private static final Type CHILD_PARTITIONS_RECORD_TYPE =
+ Type.struct(
+ StructField.of("start_timestamp", Type.timestamp()),
+ StructField.of("record_sequence", Type.string()),
+ StructField.of("child_partitions",
Type.array(CHILD_PARTITION_TYPE)));
+ private static final Type STREAM_RECORD_TYPE =
+ Type.struct(
+ StructField.of("data_change_record",
Type.array(DATA_CHANGE_RECORD_TYPE)),
+ StructField.of("heartbeat_record",
Type.array(HEARTBEAT_RECORD_TYPE)),
+ StructField.of("child_partitions_record",
Type.array(CHILD_PARTITIONS_RECORD_TYPE)));
+
+ public static Struct recordsToStruct(ChangeStreamRecord... records) {
+ return Struct.newBuilder()
+ .add(
+ Value.structArray(
+ STREAM_RECORD_TYPE,
+ Arrays.stream(records)
+ .map(TestStructMapper::streamRecordStructFrom)
+ .collect(Collectors.toList())))
+ .build();
+ }
+
+ private static Struct streamRecordStructFrom(ChangeStreamRecord record) {
+ if (record instanceof DataChangeRecord) {
+ return streamRecordStructFrom((DataChangeRecord) record);
+ } else if (record instanceof HeartbeatRecord) {
+ return streamRecordStructFrom((HeartbeatRecord) record);
+ } else if (record instanceof ChildPartitionsRecord) {
+ return streamRecordStructFrom((ChildPartitionsRecord) record);
+ } else {
+ throw new UnsupportedOperationException("Unimplemented mapping for " +
record.getClass());
+ }
+ }
+
+ private static Struct streamRecordStructFrom(ChildPartitionsRecord record) {
+ return Struct.newBuilder()
+ .set("data_change_record")
+ .to(Value.structArray(DATA_CHANGE_RECORD_TYPE,
Collections.emptyList()))
+ .set("heartbeat_record")
+ .to(Value.structArray(HEARTBEAT_RECORD_TYPE, Collections.emptyList()))
+ .set("child_partitions_record")
+ .to(
+ Value.structArray(
+ CHILD_PARTITIONS_RECORD_TYPE,
Collections.singletonList(recordStructFrom(record))))
+ .build();
+ }
+
+ private static Struct recordStructFrom(ChildPartitionsRecord record) {
+ final Value childPartitions =
+ Value.structArray(
+ CHILD_PARTITION_TYPE,
+ record.getChildPartitions().stream()
+ .map(TestStructMapper::childPartitionFrom)
+ .collect(Collectors.toList()));
+ return Struct.newBuilder()
+ .set("start_timestamp")
+ .to(record.getStartTimestamp())
+ .set("record_sequence")
+ .to(record.getRecordSequence())
+ .set("child_partitions")
+ .to(childPartitions)
+ .build();
+ }
+
+ private static Struct streamRecordStructFrom(HeartbeatRecord record) {
+ return Struct.newBuilder()
+ .set("data_change_record")
+ .to(Value.structArray(DATA_CHANGE_RECORD_TYPE,
Collections.emptyList()))
+ .set("heartbeat_record")
+ .to(
+ Value.structArray(
+ HEARTBEAT_RECORD_TYPE,
Collections.singletonList(recordStructFrom(record))))
+ .set("child_partitions_record")
+ .to(Value.structArray(CHILD_PARTITIONS_RECORD_TYPE,
Collections.emptyList()))
+ .build();
+ }
+
+ private static Struct recordStructFrom(HeartbeatRecord record) {
+ return
Struct.newBuilder().set("timestamp").to(record.getTimestamp()).build();
+ }
+
+ private static Struct streamRecordStructFrom(DataChangeRecord record) {
+ return Struct.newBuilder()
+ .set("data_change_record")
+ .to(
+ Value.structArray(
+ DATA_CHANGE_RECORD_TYPE,
Collections.singletonList(recordStructFrom(record))))
+ .set("heartbeat_record")
+ .to(Value.structArray(HEARTBEAT_RECORD_TYPE, Collections.emptyList()))
+ .set("child_partitions_record")
+ .to(Value.structArray(CHILD_PARTITIONS_RECORD_TYPE,
Collections.emptyList()))
+ .build();
+ }
+
+ private static Struct recordStructFrom(DataChangeRecord record) {
+ final Value columnTypes =
+ Value.structArray(
+ COLUMN_TYPE_TYPE,
+ record.getRowType().stream()
+ .map(TestStructMapper::columnTypeStructFrom)
+ .collect(Collectors.toList()));
+ final Value mods =
+ Value.structArray(
+ MOD_TYPE,
+ record.getMods().stream()
+ .map(TestStructMapper::modStructFrom)
+ .collect(Collectors.toList()));
+ return Struct.newBuilder()
+ .set("commit_timestamp")
+ .to(record.getCommitTimestamp())
+ .set("record_sequence")
+ .to(record.getRecordSequence())
+ .set("server_transaction_id")
+ .to(record.getServerTransactionId())
+ .set("is_last_record_in_transaction_in_partition")
+ .to(record.isLastRecordInTransactionInPartition())
+ .set("table_name")
+ .to(record.getTableName())
+ .set("column_types")
+ .to(columnTypes)
+ .set("mods")
+ .to(mods)
+ .set("mod_type")
+ .to(record.getModType().toString())
+ .set("value_capture_type")
+ .to(record.getValueCaptureType().toString())
+ .set("number_of_records_in_transaction")
+ .to(record.getNumberOfRecordsInTransaction())
+ .set("number_of_partitions_in_transaction")
+ .to(record.getNumberOfPartitionsInTransaction())
+ .build();
+ }
+
+ private static Struct columnTypeStructFrom(ColumnType columnType) {
+ return Struct.newBuilder()
+ .set("name")
+ .to(columnType.getName())
+ .set("type")
+ .to(columnType.getType().getCode())
+ .set("is_primary_key")
+ .to(columnType.isPrimaryKey())
+ .set("ordinal_position")
+ .to(columnType.getOrdinalPosition())
+ .build();
+ }
+
+ private static Struct modStructFrom(Mod mod) {
+ return Struct.newBuilder()
+ .set("keys")
+ .to(mod.getKeysJson())
+ .set("new_values")
+ .to(mod.getNewValuesJson())
+ .set("old_values")
+ .to(mod.getOldValuesJson())
+ .build();
+ }
+
+ private static Struct childPartitionFrom(ChildPartition childPartition) {
+ return Struct.newBuilder()
+ .set("token")
+ .to(childPartition.getToken())
+ .set("parent_partition_tokens")
+ .to(Value.stringArray(childPartition.getParentTokens()))
+ .build();
+ }
+}