This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 39c40f2  Merge pull request #16203 from [BEAM-12164] Add Spanner 
Change Stream Mappers
39c40f2 is described below

commit 39c40f294c75616da8622f81ff959fa475a6fd20
Author: Thiago Nunes <[email protected]>
AuthorDate: Fri Jan 7 16:29:58 2022 +1100

    Merge pull request #16203 from [BEAM-12164] Add Spanner Change Stream 
Mappers
    
    * [BEAM-12164] Add Spanner Partition Metadata DAOs
    
    * fix: remove metrics table from DAO
    
    * fix: fix compilation error
    
    * chore: fix linting violations
    
    * feat: add opencensus dependency
    
    * [BEAM-12164] Add Spanner Change Stream Mappers
    
    The mapper classes convert from Cloud Spanner Structs to the change
    stream models used by the connector. There are two mappers implemented:
    
    1. For mapping to partition metadata models.
    2. For mapping to change stream records (one of heartbeat, data or child
       partitions).
    
    * deps: update OpenCensus API to 0.30.0
---
 .../mapper/ChangeStreamRecordMapper.java           | 324 +++++++++++++++++++++
 .../changestreams/mapper/MapperFactory.java        |  67 +++++
 .../mapper/PartitionMetadataMapper.java            |  98 +++++++
 .../spanner/changestreams/mapper/package-info.java |  26 ++
 .../mapper/ChangeStreamRecordMapperTest.java       | 223 ++++++++++++++
 .../mapper/PartitionMetadataMapperTest.java        | 142 +++++++++
 .../changestreams/util/TestStructMapper.java       | 232 +++++++++++++++
 7 files changed, 1112 insertions(+)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java
new file mode 100644
index 0000000..db5b0ce
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper;
+
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.Struct;
+import java.util.HashSet;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamResultSetMetadata;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChangeStreamRecord;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChangeStreamRecordMetadata;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartition;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ColumnType;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.InitialPartition;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ModType;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.TypeCode;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ValueCaptureType;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+
+/**
+ * This class is responsible for transforming a {@link Struct} to a {@link 
List} of {@link
+ * ChangeStreamRecord} models.
+ */
+public class ChangeStreamRecordMapper {
+
+  private static final String DATA_CHANGE_RECORD_COLUMN = "data_change_record";
+  private static final String HEARTBEAT_RECORD_COLUMN = "heartbeat_record";
+  private static final String CHILD_PARTITIONS_RECORD_COLUMN = 
"child_partitions_record";
+
+  private static final String COMMIT_TIMESTAMP_COLUMN = "commit_timestamp";
+  private static final String SERVER_TRANSACTION_ID_COLUMN = 
"server_transaction_id";
+  private static final String 
IS_LAST_RECORD_IN_TRANSACTION_IN_PARTITION_COLUMN =
+      "is_last_record_in_transaction_in_partition";
+  private static final String RECORD_SEQUENCE_COLUMN = "record_sequence";
+  private static final String TABLE_NAME_COLUMN = "table_name";
+  private static final String COLUMN_TYPES_COLUMN = "column_types";
+  private static final String MODS_COLUMN = "mods";
+  private static final String MOD_TYPE_COLUMN = "mod_type";
+  private static final String VALUE_CAPTURE_TYPE_COLUMN = "value_capture_type";
+  private static final String NUMBER_OF_RECORDS_IN_TRANSACTION_COLUMN =
+      "number_of_records_in_transaction";
+  private static final String NUMBER_OF_PARTITIONS_IN_TRANSACTION_COLUMN =
+      "number_of_partitions_in_transaction";
+  private static final String NAME_COLUMN = "name";
+  private static final String TYPE_COLUMN = "type";
+  private static final String IS_PRIMARY_KEY_COLUMN = "is_primary_key";
+  private static final String ORDINAL_POSITION_COLUMN = "ordinal_position";
+  private static final String KEYS_COLUMN = "keys";
+  private static final String OLD_VALUES_COLUMN = "old_values";
+  private static final String NEW_VALUES_COLUMN = "new_values";
+
+  private static final String TIMESTAMP_COLUMN = "timestamp";
+
+  private static final String START_TIMESTAMP_COLUMN = "start_timestamp";
+  private static final String CHILD_PARTITIONS_COLUMN = "child_partitions";
+  private static final String PARENT_PARTITION_TOKENS_COLUMN = 
"parent_partition_tokens";
+  private static final String TOKEN_COLUMN = "token";
+
+  ChangeStreamRecordMapper() {}
+
+  /**
+   * Transforms a {@link Struct} representing a change stream result into a 
{@link List} of {@link
+   * ChangeStreamRecord} model. The type of the change stream record will be 
identified and one of
+   * the following subclasses can be returned within the resulting {@link 
List}:
+   *
+   * <ul>
+   *   <li>{@link DataChangeRecord}
+   *   <li>{@link HeartbeatRecord}
+   *   <li>{@link ChildPartitionsRecord}
+   * </ul>
+   *
+   * Additionally to the {@link Struct} received, the originating partition of 
the records (given by
+   * the {@link PartitionMetadata} parameter) and the stream metadata (given 
by the {@link
+   * ChangeStreamResultSetMetadata}) are used to populate the {@link 
ChangeStreamRecordMetadata} for
+   * each record mapped.
+   *
+   * <p>The {@link Struct} is expected to have the following fields:
+   *
+   * <ul>
+   *   <li>{@link ChangeStreamRecordMapper#DATA_CHANGE_RECORD_COLUMN}: 
non-nullable {@link Struct}
+   *       list of data change records.
+   *       <ul>
+   *         <li>{@link ChangeStreamRecordMapper#COMMIT_TIMESTAMP_COLUMN}: 
non-nullable {@link
+   *             Timestamp} representing the timestamp at which the 
modifications within the record
+   *             were committed in Cloud Spanner.
+   *         <li>{@link 
ChangeStreamRecordMapper#SERVER_TRANSACTION_ID_COLUMN}: non-nullable {@link
+   *             String} representing the unique transaction id in which the 
modifications for this
+   *             record occurred.
+   *         <li>{@link 
ChangeStreamRecordMapper#IS_LAST_RECORD_IN_TRANSACTION_IN_PARTITION_COLUMN}:
+   *             non-nullable {@link Boolean} indicating whether this record 
is the last emitted for
+   *             the transaction.
+   *         <li>{@link ChangeStreamRecordMapper#RECORD_SEQUENCE_COLUMN}: 
non-nullable {@link
+   *             String} representing the order in which this record appears 
within the context of a
+   *             partition, commit timestamp and transaction.
+   *         <li>{@link ChangeStreamRecordMapper#TABLE_NAME_COLUMN}: 
non-nullable {@link String}
+   *             representing the name of the table in which the modifications 
for this record
+   *             occurred.
+   *         <li>{@link ChangeStreamRecordMapper#COLUMN_TYPES_COLUMN}: 
non-nullable {@link List} of
+   *             {@link Struct}s representing the type of the primary keys and 
modified columns
+   *             within this record.
+   *             <ul>
+   *               <li>{@link ChangeStreamRecordMapper#NAME_COLUMN}: 
non-nullable {@link String}
+   *                   representing the name of a column.
+   *               <li>{@link ChangeStreamRecordMapper#TYPE_COLUMN}: 
non-nullable {@link String}
+   *                   representing the type of a column.
+   *               <li>{@link ChangeStreamRecordMapper#IS_PRIMARY_KEY_COLUMN}: 
non-nullable {@link
+   *                   Boolean} indicating if the column is part of the 
primary key.
+   *               <li>{@link 
ChangeStreamRecordMapper#ORDINAL_POSITION_COLUMN}: non-nullable {@link
+   *                   Long} representing the position of the column in the 
table it is defined.
+   *             </ul>
+   *         <li>{@link ChangeStreamRecordMapper#MODS_COLUMN}: non-nullable 
{@link List} of {@link
+   *             Struct}s representing the data modifications within this 
record.
+   *             <ul>
+   *               <li>{@link ChangeStreamRecordMapper#KEYS_COLUMN}: 
non-nullable {@link String}
+   *                   json object, where keys are the primary key column 
names, and the values are
+   *                   their corresponding values.
+   *               <li>{@link ChangeStreamRecordMapper#OLD_VALUES_COLUMN}: 
nullable {@link String}
+   *                   json object displaying the old state of the columns 
modified, where keys are
+   *                   the column names, and the values are their 
corresponding values.
+   *               <li>{@link ChangeStreamRecordMapper#NEW_VALUES_COLUMN}: 
nullable {@link String}
+   *                   json object displaying the new state of the columns 
modified, where keys are
+   *                   the column names, and the values are their 
corresponding values.
+   *             </ul>
+   *         <li>{@link ChangeStreamRecordMapper#MOD_TYPE_COLUMN}: 
non-nullable {@link String}
+   *             representing the type of operation that caused the 
modifications (see also {@link
+   *             ModType}.
+   *         <li>{@link ChangeStreamRecordMapper#VALUE_CAPTURE_TYPE_COLUMN}: 
non-nullable {@link
+   *             String} representing the capture type of the change stream 
that generated this
+   *             record (see also {@link ValueCaptureType}).
+   *         <li>{@link 
ChangeStreamRecordMapper#NUMBER_OF_RECORDS_IN_TRANSACTION_COLUMN}:
+   *             non-nullable {@link Long} representing the total number of 
data change records for
+   *             the transaction in which this record occurred.
+   *         <li>{@link 
ChangeStreamRecordMapper#NUMBER_OF_PARTITIONS_IN_TRANSACTION_COLUMN}:
+   *             non-nullable {@link Long} representing the total number of 
partitions for the
+   *             transaction in which this record occurred.
+   *       </ul>
+   *   <li>{@link ChangeStreamRecordMapper#HEARTBEAT_RECORD_COLUMN}: 
non-nullable {@link Struct}
+   *       list of hearbeat records.
+   *       <ul>
+   *         <li>{@link ChangeStreamRecordMapper#TIMESTAMP_COLUMN}: 
non-nullable {@link Timestamp}
+   *             representing the timestamp for which the change stream query 
has returned all
+   *             changes (see more in {@link HeartbeatRecord#getTimestamp()}.
+   *       </ul>
+   *   <li>{@link ChangeStreamRecordMapper#CHILD_PARTITIONS_RECORD_COLUMN}: 
non-nullable {@link
+   *       Struct} list of child partitions records.
+   *       <ul>
+   *         <li>{@link ChangeStreamRecordMapper#START_TIMESTAMP_COLUMN}: 
non-nullable {@link
+   *             Timestamp} representing the timestamp at which the new 
partition started being
+   *             valid in Cloud Spanner.
+   *         <li>{@link ChangeStreamRecordMapper#RECORD_SEQUENCE_COLUMN}: 
non-nullable {@link
+   *             String} representing the order in which this record appears 
within the context of a
+   *             partition and commit timestamp.
+   *         <li>{@link ChangeStreamRecordMapper#CHILD_PARTITIONS_COLUMN}: 
non-nullable {@link List}
+   *             of {@link Struct} representing the new child partitions.
+   *             <ul>
+   *               <li>{@link ChangeStreamRecordMapper#TOKEN_COLUMN}: 
non-nullable {@link String}
+   *                   representing the unique identifier of the new child 
partition.
+   *               <li>{@link 
ChangeStreamRecordMapper#PARENT_PARTITION_TOKENS_COLUMN}: non-nullable
+   *                   {@link List} of {@link String} representing the unique 
identifier(s) of
+   *                   parent partition(s) where this child partition 
originated from.
+   *             </ul>
+   *       </ul>
+   * </ul>
+   *
+   * @param partition the partition metadata from which the row was generated
+   * @param row the struct row, representing a single change stream result (it 
may contain multiple
+   *     change stream records within)
+   * @param resultSetMetadata the metadata generated when reading the change 
stream row
+   * @return a {@link List} of {@link ChangeStreamRecord} subclasses
+   */
+  public List<ChangeStreamRecord> toChangeStreamRecords(
+      PartitionMetadata partition, Struct row, ChangeStreamResultSetMetadata 
resultSetMetadata) {
+    return row.getStructList(0).stream()
+        .flatMap(struct -> toChangeStreamRecord(partition, struct, 
resultSetMetadata))
+        .collect(Collectors.toList());
+  }
+
+  private Stream<ChangeStreamRecord> toChangeStreamRecord(
+      PartitionMetadata partition, Struct row, ChangeStreamResultSetMetadata 
resultSetMetadata) {
+
+    final Stream<DataChangeRecord> dataChangeRecords =
+        row.getStructList(DATA_CHANGE_RECORD_COLUMN).stream()
+            .filter(this::isNonNullDataChangeRecord)
+            .map(struct -> toDataChangeRecord(partition, struct, 
resultSetMetadata));
+
+    final Stream<HeartbeatRecord> heartbeatRecords =
+        row.getStructList(HEARTBEAT_RECORD_COLUMN).stream()
+            .filter(this::isNonNullHeartbeatRecord)
+            .map(struct -> toHeartbeatRecord(partition, struct, 
resultSetMetadata));
+
+    final Stream<ChildPartitionsRecord> childPartitionsRecords =
+        row.getStructList(CHILD_PARTITIONS_RECORD_COLUMN).stream()
+            .filter(this::isNonNullChildPartitionsRecord)
+            .map(struct -> toChildPartitionsRecord(partition, struct, 
resultSetMetadata));
+
+    return Stream.concat(
+        Stream.concat(dataChangeRecords, heartbeatRecords), 
childPartitionsRecords);
+  }
+
+  private boolean isNonNullDataChangeRecord(Struct row) {
+    return !row.isNull(COMMIT_TIMESTAMP_COLUMN);
+  }
+
+  private boolean isNonNullHeartbeatRecord(Struct row) {
+    return !row.isNull(TIMESTAMP_COLUMN);
+  }
+
+  private boolean isNonNullChildPartitionsRecord(Struct row) {
+    return !row.isNull(START_TIMESTAMP_COLUMN);
+  }
+
+  private DataChangeRecord toDataChangeRecord(
+      PartitionMetadata partition, Struct row, ChangeStreamResultSetMetadata 
resultSetMetadata) {
+    final Timestamp commitTimestamp = 
row.getTimestamp(COMMIT_TIMESTAMP_COLUMN);
+    return new DataChangeRecord(
+        partition.getPartitionToken(),
+        commitTimestamp,
+        row.getString(SERVER_TRANSACTION_ID_COLUMN),
+        row.getBoolean(IS_LAST_RECORD_IN_TRANSACTION_IN_PARTITION_COLUMN),
+        row.getString(RECORD_SEQUENCE_COLUMN),
+        row.getString(TABLE_NAME_COLUMN),
+        row.getStructList(COLUMN_TYPES_COLUMN).stream()
+            .map(this::columnTypeFrom)
+            .collect(Collectors.toList()),
+        
row.getStructList(MODS_COLUMN).stream().map(this::modFrom).collect(Collectors.toList()),
+        ModType.valueOf(row.getString(MOD_TYPE_COLUMN)),
+        ValueCaptureType.valueOf(row.getString(VALUE_CAPTURE_TYPE_COLUMN)),
+        row.getLong(NUMBER_OF_RECORDS_IN_TRANSACTION_COLUMN),
+        row.getLong(NUMBER_OF_PARTITIONS_IN_TRANSACTION_COLUMN),
+        changeStreamRecordMetadataFrom(partition, commitTimestamp, 
resultSetMetadata));
+  }
+
+  private HeartbeatRecord toHeartbeatRecord(
+      PartitionMetadata partition, Struct row, ChangeStreamResultSetMetadata 
resultSetMetadata) {
+    final Timestamp timestamp = row.getTimestamp(TIMESTAMP_COLUMN);
+
+    return new HeartbeatRecord(
+        timestamp, changeStreamRecordMetadataFrom(partition, timestamp, 
resultSetMetadata));
+  }
+
+  private ChildPartitionsRecord toChildPartitionsRecord(
+      PartitionMetadata partition, Struct row, ChangeStreamResultSetMetadata 
resultSetMetadata) {
+    final Timestamp startTimestamp = row.getTimestamp(START_TIMESTAMP_COLUMN);
+
+    return new ChildPartitionsRecord(
+        startTimestamp,
+        row.getString(RECORD_SEQUENCE_COLUMN),
+        row.getStructList(CHILD_PARTITIONS_COLUMN).stream()
+            .map(struct -> childPartitionFrom(partition.getPartitionToken(), 
struct))
+            .collect(Collectors.toList()),
+        changeStreamRecordMetadataFrom(partition, startTimestamp, 
resultSetMetadata));
+  }
+
+  private ColumnType columnTypeFrom(Struct struct) {
+    return new ColumnType(
+        struct.getString(NAME_COLUMN),
+        new TypeCode(struct.getString(TYPE_COLUMN)),
+        struct.getBoolean(IS_PRIMARY_KEY_COLUMN),
+        struct.getLong(ORDINAL_POSITION_COLUMN));
+  }
+
+  private Mod modFrom(Struct struct) {
+    final String keysJson = struct.getString(KEYS_COLUMN);
+    final String oldValuesJson =
+        struct.isNull(OLD_VALUES_COLUMN) ? null : 
struct.getString(OLD_VALUES_COLUMN);
+    final String newValuesJson =
+        struct.isNull(NEW_VALUES_COLUMN)
+            ? null
+            : struct.getString(ChangeStreamRecordMapper.NEW_VALUES_COLUMN);
+    return new Mod(keysJson, oldValuesJson, newValuesJson);
+  }
+
+  private ChildPartition childPartitionFrom(String partitionToken, Struct 
struct) {
+    final HashSet<String> parentTokens =
+        Sets.newHashSet(struct.getStringList(PARENT_PARTITION_TOKENS_COLUMN));
+    if (InitialPartition.isInitialPartition(partitionToken)) {
+      parentTokens.add(partitionToken);
+    }
+    return new ChildPartition(struct.getString(TOKEN_COLUMN), parentTokens);
+  }
+
+  private ChangeStreamRecordMetadata changeStreamRecordMetadataFrom(
+      PartitionMetadata partition,
+      Timestamp recordTimestamp,
+      ChangeStreamResultSetMetadata resultSetMetadata) {
+    return ChangeStreamRecordMetadata.newBuilder()
+        .withRecordTimestamp(recordTimestamp)
+        .withPartitionToken(partition.getPartitionToken())
+        .withPartitionStartTimestamp(partition.getStartTimestamp())
+        .withPartitionEndTimestamp(partition.getEndTimestamp())
+        .withPartitionCreatedAt(partition.getCreatedAt())
+        .withPartitionScheduledAt(partition.getScheduledAt())
+        .withPartitionRunningAt(partition.getRunningAt())
+        .withQueryStartedAt(resultSetMetadata.getQueryStartedAt())
+        
.withRecordStreamStartedAt(resultSetMetadata.getRecordStreamStartedAt())
+        .withRecordStreamEndedAt(resultSetMetadata.getRecordStreamEndedAt())
+        .withRecordReadAt(resultSetMetadata.getRecordReadAt())
+        
.withTotalStreamTimeMillis(resultSetMetadata.getTotalStreamDuration().getMillis())
+        .withNumberOfRecordsRead(resultSetMetadata.getNumberOfRecordsRead())
+        .build();
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/MapperFactory.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/MapperFactory.java
new file mode 100644
index 0000000..5227463
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/MapperFactory.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper;
+
+import java.io.Serializable;
+
+/**
+ * Factory class for creating instances that will map a struct to a connector 
model. The instances
+ * created are all singletons.
+ */
+// static fields are un-initialized, because we start them during the first 
fetch call (with the
+// singleton pattern)
+@SuppressWarnings("initialization.static.fields.uninitialized")
+public class MapperFactory implements Serializable {
+
+  private static final long serialVersionUID = -813434573067800902L;
+
+  private static ChangeStreamRecordMapper changeStreamRecordMapperInstance;
+  private static PartitionMetadataMapper partitionMetadataMapperInstance;
+
+  /**
+   * Creates and returns a singleton instance of a mapper class capable of 
transforming a {@link
+   * com.google.cloud.spanner.Struct} into a {@link java.util.List} of {@link
+   * 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChangeStreamRecord} 
subclasses.
+   *
+   * <p>This method is thread safe.
+   *
+   * @return singleton instance of the {@link ChangeStreamRecordMapper}
+   */
+  public synchronized ChangeStreamRecordMapper changeStreamRecordMapper() {
+    if (changeStreamRecordMapperInstance == null) {
+      changeStreamRecordMapperInstance = new ChangeStreamRecordMapper();
+    }
+    return changeStreamRecordMapperInstance;
+  }
+
+  /**
+   * Creates and returns a single instance of a mapper class capable of 
transforming a {@link
+   * com.google.cloud.spanner.Struct} into a {@link
+   * org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata} 
class.
+   *
+   * <p>This method is thread safe.
+   *
+   * @return singleton instance of the {@link PartitionMetadataMapper}
+   */
+  public synchronized PartitionMetadataMapper partitionMetadataMapper() {
+    if (partitionMetadataMapperInstance == null) {
+      partitionMetadataMapperInstance = new PartitionMetadataMapper();
+    }
+    return partitionMetadataMapperInstance;
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/PartitionMetadataMapper.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/PartitionMetadataMapper.java
new file mode 100644
index 0000000..1e0ece1
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/PartitionMetadataMapper.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper;
+
+import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_CREATED_AT;
+import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_END_TIMESTAMP;
+import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_FINISHED_AT;
+import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_HEARTBEAT_MILLIS;
+import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_PARENT_TOKENS;
+import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_PARTITION_TOKEN;
+import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_RUNNING_AT;
+import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_SCHEDULED_AT;
+import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_START_TIMESTAMP;
+import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_STATE;
+import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_WATERMARK;
+
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.Struct;
+import java.util.List;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+
+/** This class is responsible for transforming a {@link Struct} to a {@link 
PartitionMetadata}. */
+public class PartitionMetadataMapper {
+
+  PartitionMetadataMapper() {}
+
+  /**
+   * Transforms a {@link Struct} representing a partition metadata row into a 
{@link
+   * PartitionMetadata} model. The {@link Struct} is expected to have the 
following fields:
+   *
+   * <ul>
+   *   <li>{@link PartitionMetadataAdminDao#COLUMN_PARTITION_TOKEN}: 
non-nullable {@link String}
+   *       representing the partition unique identifier.
+   *   <li>{@link PartitionMetadataAdminDao#COLUMN_PARENT_TOKENS}: 
non-nullable {@link List} of
+   *       {@link String} representing the partition parents' unique 
identifiers.
+   *   <li>{@link PartitionMetadataAdminDao#COLUMN_START_TIMESTAMP}: 
non-nullable {@link Timestamp}
+   *       representing the start time at which the partition started existing 
in Cloud Spanner.
+   *   <li>{@link PartitionMetadataAdminDao#COLUMN_END_TIMESTAMP}: nullable 
{@link Timestamp}
+   *       representing the end time for querying this partition.
+   *   <li>{@link PartitionMetadataAdminDao#COLUMN_HEARTBEAT_MILLIS}: 
non-nullable {@link Long}
+   *       representing the number of milliseconds after the stream is idle, 
which a heartbeat
+   *       record will be emitted.
+   *   <li>{@link PartitionMetadataAdminDao#COLUMN_STATE}: non-nullable {@link 
String} representing
+   *       the {@link State} in which the partition is in.
+   *   <li>{@link PartitionMetadataAdminDao#COLUMN_WATERMARK}: non-nullable 
{@link Timestamp}
+   *       representing the time for which all records with a timestamp less 
than it have been
+   *       processed.
+   *   <li>{@link PartitionMetadataAdminDao#COLUMN_CREATED_AT}: non-nullable 
{@link Timestamp}
+   *       representing the time at which this partition was first detected.
+   *   <li>{@link PartitionMetadataAdminDao#COLUMN_SCHEDULED_AT}: nullable 
{@link Timestamp}
+   *       representing the time at which this partition was scheduled to be 
queried.
+   *   <li>{@link PartitionMetadataAdminDao#COLUMN_RUNNING_AT}: nullable 
{@link Timestamp}
+   *       representing the time at which the connector started processing 
this partition.
+   *   <li>{@link PartitionMetadataAdminDao#COLUMN_FINISHED_AT}: nullable 
{@link Timestamp}
+   *       representing the time at which the connector finished processing 
this partition.
+   * </ul>
+   *
+   * @param row the {@link Struct} row to be converted. It should contain all 
the fields as
+   *     specified above.
+   * @return a {@link PartitionMetadata} with the mapped {@link Struct} field 
values.
+   */
+  public PartitionMetadata from(Struct row) {
+    return PartitionMetadata.newBuilder()
+        .setPartitionToken(row.getString(COLUMN_PARTITION_TOKEN))
+        
.setParentTokens(Sets.newHashSet(row.getStringList(COLUMN_PARENT_TOKENS)))
+        .setStartTimestamp(row.getTimestamp(COLUMN_START_TIMESTAMP))
+        .setEndTimestamp(
+            !row.isNull(COLUMN_END_TIMESTAMP) ? 
row.getTimestamp(COLUMN_END_TIMESTAMP) : null)
+        .setHeartbeatMillis(row.getLong(COLUMN_HEARTBEAT_MILLIS))
+        .setState(State.valueOf(row.getString(COLUMN_STATE)))
+        .setWatermark(row.getTimestamp(COLUMN_WATERMARK))
+        .setCreatedAt(row.getTimestamp(COLUMN_CREATED_AT))
+        .setScheduledAt(
+            !row.isNull(COLUMN_SCHEDULED_AT) ? 
row.getTimestamp(COLUMN_SCHEDULED_AT) : null)
+        .setRunningAt(!row.isNull(COLUMN_RUNNING_AT) ? 
row.getTimestamp(COLUMN_RUNNING_AT) : null)
+        .setFinishedAt(
+            !row.isNull(COLUMN_FINISHED_AT) ? 
row.getTimestamp(COLUMN_FINISHED_AT) : null)
+        .build();
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/package-info.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/package-info.java
new file mode 100644
index 0000000..7f918fd
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/package-info.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Mapping related functionality, such as from {@link 
com.google.cloud.spanner.ResultSet}s to Change
+ * Stream models.
+ */
+@Experimental
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper;
+
+import org.apache.beam.sdk.annotations.Experimental;
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java
new file mode 100644
index 0000000..26609f6
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapperTest.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper;
+
+import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestStructMapper.recordsToStruct;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.Struct;
+import java.util.Arrays;
+import java.util.Collections;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamResultSetMetadata;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartition;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ColumnType;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.InitialPartition;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ModType;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.TypeCode;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ValueCaptureType;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.joda.time.Duration;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ChangeStreamRecordMapperTest {
+
+  private ChangeStreamRecordMapper mapper;
+  private PartitionMetadata partition;
+  private ChangeStreamResultSetMetadata resultSetMetadata;
+
+  @Before
+  public void setUp() {
+    mapper = new ChangeStreamRecordMapper();
+    partition =
+        PartitionMetadata.newBuilder()
+            .setPartitionToken("partitionToken")
+            .setParentTokens(Sets.newHashSet("parentToken"))
+            .setHeartbeatMillis(30_000L)
+            .setState(State.RUNNING)
+            .setWatermark(Timestamp.ofTimeMicroseconds(10L))
+            .setStartTimestamp(Timestamp.ofTimeMicroseconds(11L))
+            .setEndTimestamp(Timestamp.ofTimeMicroseconds(12L))
+            .setCreatedAt(Timestamp.ofTimeMicroseconds(13L))
+            .setScheduledAt(Timestamp.ofTimeMicroseconds(14L))
+            .setRunningAt(Timestamp.ofTimeMicroseconds(15L))
+            .build();
+    resultSetMetadata = mock(ChangeStreamResultSetMetadata.class);
+    
when(resultSetMetadata.getQueryStartedAt()).thenReturn(Timestamp.ofTimeMicroseconds(1L));
+    
when(resultSetMetadata.getRecordStreamStartedAt()).thenReturn(Timestamp.ofTimeMicroseconds(2L));
+    
when(resultSetMetadata.getRecordStreamEndedAt()).thenReturn(Timestamp.ofTimeMicroseconds(3L));
+    
when(resultSetMetadata.getRecordReadAt()).thenReturn(Timestamp.ofTimeMicroseconds(4L));
+    
when(resultSetMetadata.getTotalStreamDuration()).thenReturn(Duration.millis(100));
+    when(resultSetMetadata.getNumberOfRecordsRead()).thenReturn(10_000L);
+  }
+
+  @Test
+  public void testMappingUpdateStructRowToDataChangeRecord() {
+    final DataChangeRecord dataChangeRecord =
+        new DataChangeRecord(
+            "partitionToken",
+            Timestamp.ofTimeSecondsAndNanos(10L, 20),
+            "serverTransactionId",
+            true,
+            "1",
+            "tableName",
+            Arrays.asList(
+                new ColumnType("column1", new TypeCode("type1"), true, 1L),
+                new ColumnType("column2", new TypeCode("type2"), false, 2L)),
+            Collections.singletonList(
+                new Mod(
+                    "{\"column1\": \"value1\"}",
+                    "{\"column2\": \"oldValue2\"}",
+                    "{\"column2\": \"newValue2\"}")),
+            ModType.UPDATE,
+            ValueCaptureType.OLD_AND_NEW_VALUES,
+            10L,
+            2L,
+            null);
+    final Struct struct = recordsToStruct(dataChangeRecord);
+
+    assertEquals(
+        Collections.singletonList(dataChangeRecord),
+        mapper.toChangeStreamRecords(partition, struct, resultSetMetadata));
+  }
+
+  @Test
+  public void testMappingInsertStructRowToDataChangeRecord() {
+    final DataChangeRecord dataChangeRecord =
+        new DataChangeRecord(
+            "partitionToken",
+            Timestamp.ofTimeSecondsAndNanos(10L, 20),
+            "transactionId",
+            false,
+            "1",
+            "tableName",
+            Arrays.asList(
+                new ColumnType("column1", new TypeCode("type1"), true, 1L),
+                new ColumnType("column2", new TypeCode("type2"), false, 2L)),
+            Collections.singletonList(
+                new Mod("{\"column1\": \"value1\"}", null, "{\"column2\": 
\"newValue2\"}")),
+            ModType.INSERT,
+            ValueCaptureType.OLD_AND_NEW_VALUES,
+            10L,
+            2L,
+            null);
+    final Struct struct = recordsToStruct(dataChangeRecord);
+
+    assertEquals(
+        Collections.singletonList(dataChangeRecord),
+        mapper.toChangeStreamRecords(partition, struct, resultSetMetadata));
+  }
+
+  @Test
+  public void testMappingDeleteStructRowToDataChangeRecord() {
+    final DataChangeRecord dataChangeRecord =
+        new DataChangeRecord(
+            "partitionToken",
+            Timestamp.ofTimeSecondsAndNanos(10L, 20),
+            "transactionId",
+            false,
+            "1",
+            "tableName",
+            Arrays.asList(
+                new ColumnType("column1", new TypeCode("type1"), true, 1L),
+                new ColumnType("column2", new TypeCode("type2"), false, 2L)),
+            Collections.singletonList(
+                new Mod("{\"column1\": \"value1\"}", "{\"column2\": 
\"oldValue2\"}", null)),
+            ModType.DELETE,
+            ValueCaptureType.OLD_AND_NEW_VALUES,
+            10L,
+            2L,
+            null);
+    final Struct struct = recordsToStruct(dataChangeRecord);
+
+    assertEquals(
+        Collections.singletonList(dataChangeRecord),
+        mapper.toChangeStreamRecords(partition, struct, resultSetMetadata));
+  }
+
+  @Test
+  public void testMappingStructRowToHeartbeatRecord() {
+    final HeartbeatRecord heartbeatRecord =
+        new HeartbeatRecord(Timestamp.ofTimeSecondsAndNanos(10L, 20), null);
+    final Struct struct = recordsToStruct(heartbeatRecord);
+
+    assertEquals(
+        Collections.singletonList(heartbeatRecord),
+        mapper.toChangeStreamRecords(partition, struct, resultSetMetadata));
+  }
+
+  @Test
+  public void testMappingStructRowToChildPartitionRecord() {
+    final ChildPartitionsRecord childPartitionsRecord =
+        new ChildPartitionsRecord(
+            Timestamp.ofTimeSecondsAndNanos(10L, 20),
+            "1",
+            Arrays.asList(
+                new ChildPartition("childToken1", 
Sets.newHashSet("parentToken1", "parentToken2")),
+                new ChildPartition("childToken2", 
Sets.newHashSet("parentToken1", "parentToken2"))),
+            null);
+    final Struct struct = recordsToStruct(childPartitionsRecord);
+
+    assertEquals(
+        Collections.singletonList(childPartitionsRecord),
+        mapper.toChangeStreamRecords(partition, struct, resultSetMetadata));
+  }
+
+  /** Adds the default parent partition token as a parent of each child 
partition. */
+  @Test
+  public void testMappingStructRowFromInitialPartitionToChildPartitionRecord() 
{
+    final Struct struct =
+        recordsToStruct(
+            new ChildPartitionsRecord(
+                Timestamp.ofTimeSecondsAndNanos(10L, 20),
+                "1",
+                Arrays.asList(
+                    new ChildPartition("childToken1", Sets.newHashSet()),
+                    new ChildPartition("childToken2", Sets.newHashSet())),
+                null));
+    final ChildPartitionsRecord expected =
+        new ChildPartitionsRecord(
+            Timestamp.ofTimeSecondsAndNanos(10L, 20),
+            "1",
+            Arrays.asList(
+                new ChildPartition(
+                    "childToken1", 
Sets.newHashSet(InitialPartition.PARTITION_TOKEN)),
+                new ChildPartition(
+                    "childToken2", 
Sets.newHashSet(InitialPartition.PARTITION_TOKEN))),
+            null);
+
+    final PartitionMetadata initialPartition =
+        
partition.toBuilder().setPartitionToken(InitialPartition.PARTITION_TOKEN).build();
+
+    assertEquals(
+        Collections.singletonList(expected),
+        mapper.toChangeStreamRecords(initialPartition, struct, 
resultSetMetadata));
+  }
+
+  // TODO: Add test case for unknown record type
+  // TODO: Add test case for malformed record
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/PartitionMetadataMapperTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/PartitionMetadataMapperTest.java
new file mode 100644
index 0000000..b95dd52
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/PartitionMetadataMapperTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper;
+
+import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_CREATED_AT;
+import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_END_TIMESTAMP;
+import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_FINISHED_AT;
+import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_HEARTBEAT_MILLIS;
+import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_PARENT_TOKENS;
+import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_PARTITION_TOKEN;
+import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_RUNNING_AT;
+import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_SCHEDULED_AT;
+import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_START_TIMESTAMP;
+import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_STATE;
+import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataAdminDao.COLUMN_WATERMARK;
+import static org.junit.Assert.assertEquals;
+
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.Struct;
+import java.util.Collections;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PartitionMetadataMapperTest {
+
+  private PartitionMetadataMapper mapper;
+
+  @Before
+  public void setUp() {
+    mapper = new PartitionMetadataMapper();
+  }
+
+  @Test
+  public void testMapPartitionMetadataFromResultSet() {
+    final Struct row =
+        Struct.newBuilder()
+            .set(COLUMN_PARTITION_TOKEN)
+            .to("token")
+            .set(COLUMN_PARENT_TOKENS)
+            .toStringArray(Collections.singletonList("parentToken"))
+            .set(COLUMN_START_TIMESTAMP)
+            .to(Timestamp.ofTimeMicroseconds(10L))
+            .set(COLUMN_END_TIMESTAMP)
+            .to(Timestamp.ofTimeMicroseconds(20L))
+            .set(COLUMN_HEARTBEAT_MILLIS)
+            .to(5_000L)
+            .set(COLUMN_STATE)
+            .to(State.RUNNING.name())
+            .set(COLUMN_WATERMARK)
+            .to(Timestamp.ofTimeMicroseconds(30L))
+            .set(COLUMN_CREATED_AT)
+            .to(Timestamp.ofTimeMicroseconds(40L))
+            .set(COLUMN_SCHEDULED_AT)
+            .to(Timestamp.ofTimeMicroseconds(50L))
+            .set(COLUMN_RUNNING_AT)
+            .to(Timestamp.ofTimeMicroseconds(60L))
+            .set(COLUMN_FINISHED_AT)
+            .to(Timestamp.ofTimeMicroseconds(70L))
+            .build();
+
+    final PartitionMetadata partition = mapper.from(row);
+
+    assertEquals(
+        new PartitionMetadata(
+            "token",
+            Sets.newHashSet("parentToken"),
+            Timestamp.ofTimeMicroseconds(10L),
+            Timestamp.ofTimeMicroseconds(20L),
+            5_000L,
+            State.RUNNING,
+            Timestamp.ofTimeMicroseconds(30),
+            Timestamp.ofTimeMicroseconds(40),
+            Timestamp.ofTimeMicroseconds(50),
+            Timestamp.ofTimeMicroseconds(60),
+            Timestamp.ofTimeMicroseconds(70)),
+        partition);
+  }
+
+  @Test
+  public void testMapPartitionMetadataFromResultSetWithNulls() {
+    final Struct row =
+        Struct.newBuilder()
+            .set(COLUMN_PARTITION_TOKEN)
+            .to("token")
+            .set(COLUMN_PARENT_TOKENS)
+            .toStringArray(Collections.singletonList("parentToken"))
+            .set(COLUMN_START_TIMESTAMP)
+            .to(Timestamp.ofTimeMicroseconds(10L))
+            .set(COLUMN_END_TIMESTAMP)
+            .to((Timestamp) null)
+            .set(COLUMN_HEARTBEAT_MILLIS)
+            .to(5_000L)
+            .set(COLUMN_STATE)
+            .to(State.CREATED.name())
+            .set(COLUMN_WATERMARK)
+            .to(Timestamp.ofTimeMicroseconds(30L))
+            .set(COLUMN_CREATED_AT)
+            .to(Timestamp.ofTimeMicroseconds(40L))
+            .set(COLUMN_SCHEDULED_AT)
+            .to((Timestamp) null)
+            .set(COLUMN_RUNNING_AT)
+            .to((Timestamp) null)
+            .set(COLUMN_FINISHED_AT)
+            .to((Timestamp) null)
+            .build();
+
+    final PartitionMetadata partition = mapper.from(row);
+
+    assertEquals(
+        new PartitionMetadata(
+            "token",
+            Sets.newHashSet("parentToken"),
+            Timestamp.ofTimeMicroseconds(10L),
+            null,
+            5_000L,
+            State.CREATED,
+            Timestamp.ofTimeMicroseconds(30),
+            Timestamp.ofTimeMicroseconds(40),
+            null,
+            null,
+            null),
+        partition);
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/util/TestStructMapper.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/util/TestStructMapper.java
new file mode 100644
index 0000000..b699e0a
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/util/TestStructMapper.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.util;
+
+import com.google.cloud.spanner.Struct;
+import com.google.cloud.spanner.Type;
+import com.google.cloud.spanner.Type.StructField;
+import com.google.cloud.spanner.Value;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.stream.Collectors;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChangeStreamRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartition;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ColumnType;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
+
+public class TestStructMapper {
+
+  private static final Type CHILD_PARTITION_TYPE =
+      Type.struct(
+          StructField.of("token", Type.string()),
+          StructField.of("parent_partition_tokens", 
Type.array(Type.string())));
+  private static final Type COLUMN_TYPE_TYPE =
+      Type.struct(
+          StructField.of("name", Type.string()),
+          StructField.of("type", Type.string()),
+          StructField.of("is_primary_key", Type.bool()),
+          StructField.of("ordinal_position", Type.int64()));
+  private static final Type MOD_TYPE =
+      Type.struct(
+          StructField.of("keys", Type.string()),
+          StructField.of("new_values", Type.string()),
+          StructField.of("old_values", Type.string()));
+  private static final Type DATA_CHANGE_RECORD_TYPE =
+      Type.struct(
+          StructField.of("commit_timestamp", Type.timestamp()),
+          StructField.of("record_sequence", Type.string()),
+          StructField.of("server_transaction_id", Type.string()),
+          StructField.of("is_last_record_in_transaction_in_partition", 
Type.bool()),
+          StructField.of("table_name", Type.string()),
+          StructField.of("column_types", Type.array(COLUMN_TYPE_TYPE)),
+          StructField.of("mods", Type.array(MOD_TYPE)),
+          StructField.of("mod_type", Type.string()),
+          StructField.of("value_capture_type", Type.string()),
+          StructField.of("number_of_records_in_transaction", Type.int64()),
+          StructField.of("number_of_partitions_in_transaction", Type.int64()));
+  private static final Type HEARTBEAT_RECORD_TYPE =
+      Type.struct(StructField.of("timestamp", Type.timestamp()));
+  private static final Type CHILD_PARTITIONS_RECORD_TYPE =
+      Type.struct(
+          StructField.of("start_timestamp", Type.timestamp()),
+          StructField.of("record_sequence", Type.string()),
+          StructField.of("child_partitions", 
Type.array(CHILD_PARTITION_TYPE)));
+  private static final Type STREAM_RECORD_TYPE =
+      Type.struct(
+          StructField.of("data_change_record", 
Type.array(DATA_CHANGE_RECORD_TYPE)),
+          StructField.of("heartbeat_record", 
Type.array(HEARTBEAT_RECORD_TYPE)),
+          StructField.of("child_partitions_record", 
Type.array(CHILD_PARTITIONS_RECORD_TYPE)));
+
+  public static Struct recordsToStruct(ChangeStreamRecord... records) {
+    return Struct.newBuilder()
+        .add(
+            Value.structArray(
+                STREAM_RECORD_TYPE,
+                Arrays.stream(records)
+                    .map(TestStructMapper::streamRecordStructFrom)
+                    .collect(Collectors.toList())))
+        .build();
+  }
+
+  private static Struct streamRecordStructFrom(ChangeStreamRecord record) {
+    if (record instanceof DataChangeRecord) {
+      return streamRecordStructFrom((DataChangeRecord) record);
+    } else if (record instanceof HeartbeatRecord) {
+      return streamRecordStructFrom((HeartbeatRecord) record);
+    } else if (record instanceof ChildPartitionsRecord) {
+      return streamRecordStructFrom((ChildPartitionsRecord) record);
+    } else {
+      throw new UnsupportedOperationException("Unimplemented mapping for " + 
record.getClass());
+    }
+  }
+
+  private static Struct streamRecordStructFrom(ChildPartitionsRecord record) {
+    return Struct.newBuilder()
+        .set("data_change_record")
+        .to(Value.structArray(DATA_CHANGE_RECORD_TYPE, 
Collections.emptyList()))
+        .set("heartbeat_record")
+        .to(Value.structArray(HEARTBEAT_RECORD_TYPE, Collections.emptyList()))
+        .set("child_partitions_record")
+        .to(
+            Value.structArray(
+                CHILD_PARTITIONS_RECORD_TYPE, 
Collections.singletonList(recordStructFrom(record))))
+        .build();
+  }
+
+  private static Struct recordStructFrom(ChildPartitionsRecord record) {
+    final Value childPartitions =
+        Value.structArray(
+            CHILD_PARTITION_TYPE,
+            record.getChildPartitions().stream()
+                .map(TestStructMapper::childPartitionFrom)
+                .collect(Collectors.toList()));
+    return Struct.newBuilder()
+        .set("start_timestamp")
+        .to(record.getStartTimestamp())
+        .set("record_sequence")
+        .to(record.getRecordSequence())
+        .set("child_partitions")
+        .to(childPartitions)
+        .build();
+  }
+
+  private static Struct streamRecordStructFrom(HeartbeatRecord record) {
+    return Struct.newBuilder()
+        .set("data_change_record")
+        .to(Value.structArray(DATA_CHANGE_RECORD_TYPE, 
Collections.emptyList()))
+        .set("heartbeat_record")
+        .to(
+            Value.structArray(
+                HEARTBEAT_RECORD_TYPE, 
Collections.singletonList(recordStructFrom(record))))
+        .set("child_partitions_record")
+        .to(Value.structArray(CHILD_PARTITIONS_RECORD_TYPE, 
Collections.emptyList()))
+        .build();
+  }
+
+  private static Struct recordStructFrom(HeartbeatRecord record) {
+    return 
Struct.newBuilder().set("timestamp").to(record.getTimestamp()).build();
+  }
+
+  private static Struct streamRecordStructFrom(DataChangeRecord record) {
+    return Struct.newBuilder()
+        .set("data_change_record")
+        .to(
+            Value.structArray(
+                DATA_CHANGE_RECORD_TYPE, 
Collections.singletonList(recordStructFrom(record))))
+        .set("heartbeat_record")
+        .to(Value.structArray(HEARTBEAT_RECORD_TYPE, Collections.emptyList()))
+        .set("child_partitions_record")
+        .to(Value.structArray(CHILD_PARTITIONS_RECORD_TYPE, 
Collections.emptyList()))
+        .build();
+  }
+
+  private static Struct recordStructFrom(DataChangeRecord record) {
+    final Value columnTypes =
+        Value.structArray(
+            COLUMN_TYPE_TYPE,
+            record.getRowType().stream()
+                .map(TestStructMapper::columnTypeStructFrom)
+                .collect(Collectors.toList()));
+    final Value mods =
+        Value.structArray(
+            MOD_TYPE,
+            record.getMods().stream()
+                .map(TestStructMapper::modStructFrom)
+                .collect(Collectors.toList()));
+    return Struct.newBuilder()
+        .set("commit_timestamp")
+        .to(record.getCommitTimestamp())
+        .set("record_sequence")
+        .to(record.getRecordSequence())
+        .set("server_transaction_id")
+        .to(record.getServerTransactionId())
+        .set("is_last_record_in_transaction_in_partition")
+        .to(record.isLastRecordInTransactionInPartition())
+        .set("table_name")
+        .to(record.getTableName())
+        .set("column_types")
+        .to(columnTypes)
+        .set("mods")
+        .to(mods)
+        .set("mod_type")
+        .to(record.getModType().toString())
+        .set("value_capture_type")
+        .to(record.getValueCaptureType().toString())
+        .set("number_of_records_in_transaction")
+        .to(record.getNumberOfRecordsInTransaction())
+        .set("number_of_partitions_in_transaction")
+        .to(record.getNumberOfPartitionsInTransaction())
+        .build();
+  }
+
+  private static Struct columnTypeStructFrom(ColumnType columnType) {
+    return Struct.newBuilder()
+        .set("name")
+        .to(columnType.getName())
+        .set("type")
+        .to(columnType.getType().getCode())
+        .set("is_primary_key")
+        .to(columnType.isPrimaryKey())
+        .set("ordinal_position")
+        .to(columnType.getOrdinalPosition())
+        .build();
+  }
+
+  private static Struct modStructFrom(Mod mod) {
+    return Struct.newBuilder()
+        .set("keys")
+        .to(mod.getKeysJson())
+        .set("new_values")
+        .to(mod.getNewValuesJson())
+        .set("old_values")
+        .to(mod.getOldValuesJson())
+        .build();
+  }
+
+  private static Struct childPartitionFrom(ChildPartition childPartition) {
+    return Struct.newBuilder()
+        .set("token")
+        .to(childPartition.getToken())
+        .set("parent_partition_tokens")
+        .to(Value.stringArray(childPartition.getParentTokens()))
+        .build();
+  }
+}

Reply via email to