This is an automated email from the ASF dual-hosted git repository.

damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new df8bead5945 Refactor RowMutationInformation to use string type (#31323)
df8bead5945 is described below

commit df8bead5945c801854f07dce6e708b1241c94696
Author: Damon <[email protected]>
AuthorDate: Wed May 29 10:49:24 2024 -0700

    Refactor RowMutationInformation to use string type (#31323)
    
    * Refactor RowMutationInformation to use string type
    
    * Remove unnecessary test
    
    * Add javadoc
    
    * Add segment too large test cases
    
    * Add hex based test cases to integration test
---
 .../beam/sdk/io/gcp/bigquery/AppendClientInfo.java |   2 +-
 .../AvroGenericRecordToStorageApiProto.java        |  17 ++-
 .../io/gcp/bigquery/BeamRowToStorageApiProto.java  |  16 ++-
 .../beam/sdk/io/gcp/bigquery/RowMutation.java      |  27 +++--
 .../io/gcp/bigquery/RowMutationInformation.java    | 111 ++++++++++++++++-
 .../beam/sdk/io/gcp/bigquery/StorageApiCDC.java    |   9 ++
 .../StorageApiDynamicDestinationsBeamRow.java      |   4 +-
 ...StorageApiDynamicDestinationsGenericRecord.java |   7 +-
 .../StorageApiDynamicDestinationsTableRow.java     |   4 +-
 .../io/gcp/bigquery/TableRowToStorageApiProto.java |  40 +++++--
 .../sdk/io/gcp/testing/FakeDatasetService.java     |   3 +-
 .../AvroGenericRecordToStorageApiProtoTest.java    |   3 +-
 .../gcp/bigquery/BeamRowToStorageApiProtoTest.java |   4 +-
 .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java   |  88 ++++++++------
 .../gcp/bigquery/RowMutationInformationTest.java   | 132 +++++++++++++++++++++
 .../io/gcp/bigquery/StorageApiSinkRowUpdateIT.java |  63 +++++++++-
 .../bigquery/TableRowToStorageApiProtoTest.java    |   3 +-
 17 files changed, 457 insertions(+), 76 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
index 3094af5855e..211027c12b0 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
@@ -145,7 +145,7 @@ abstract class AppendClientInfo {
             true,
             null,
             null,
-            -1);
+            null);
     return msg.toByteString();
   }
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java
index 7141869b228..519f9391db6 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java
@@ -162,6 +162,19 @@ public class AvroGenericRecordToStorageApiProto {
     return builder.build();
   }
 
+  /**
+   * Forwards {@param changeSequenceNum} to {@link 
#messageFromGenericRecord(Descriptor,
+   * GenericRecord, String, String)} via {@link Long#toHexString}.
+   */
+  public static DynamicMessage messageFromGenericRecord(
+      Descriptor descriptor,
+      GenericRecord record,
+      @Nullable String changeType,
+      long changeSequenceNum) {
+    return messageFromGenericRecord(
+        descriptor, record, changeType, Long.toHexString(changeSequenceNum));
+  }
+
   /**
    * Given an Avro {@link GenericRecord} object, returns a protocol-buffer 
message that can be used
    * to write data using the BigQuery Storage streaming API.
@@ -174,7 +187,7 @@ public class AvroGenericRecordToStorageApiProto {
       Descriptor descriptor,
       GenericRecord record,
       @Nullable String changeType,
-      long changeSequenceNum) {
+      @Nullable String changeSequenceNum) {
     Schema schema = record.getSchema();
     DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
     for (Schema.Field field : schema.getFields()) {
@@ -195,7 +208,7 @@ public class AvroGenericRecordToStorageApiProto {
       builder.setField(
           org.apache.beam.sdk.util.Preconditions.checkStateNotNull(
               descriptor.findFieldByName(StorageApiCDC.CHANGE_SQN_COLUMN)),
-          changeSequenceNum);
+          
org.apache.beam.sdk.util.Preconditions.checkStateNotNull(changeSequenceNum));
     }
     return builder.build();
   }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
index d91ddd6843c..4275125ef16 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
@@ -143,12 +143,24 @@ public class BeamRowToStorageApiProto {
                   ((EnumerationType) 
logicalType).toString((EnumerationType.Value) value))
           .build();
 
+  /**
+   * Forwards (@param changeSequenceNum) to {@link 
#messageFromBeamRow(Descriptor, Row, String,
+   * String)} via {@link Long#toHexString}.
+   */
+  public static DynamicMessage messageFromBeamRow(
+      Descriptor descriptor, Row row, @Nullable String changeType, long 
changeSequenceNum) {
+    return messageFromBeamRow(descriptor, row, changeType, 
Long.toHexString(changeSequenceNum));
+  }
+
   /**
    * Given a Beam {@link Row} object, returns a protocol-buffer message that 
can be used to write
    * data using the BigQuery Storage streaming API.
    */
   public static DynamicMessage messageFromBeamRow(
-      Descriptor descriptor, Row row, @Nullable String changeType, long 
changeSequenceNum) {
+      Descriptor descriptor,
+      Row row,
+      @Nullable String changeType,
+      @Nullable String changeSequenceNum) {
     Schema beamSchema = row.getSchema();
     DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
     for (int i = 0; i < row.getFieldCount(); ++i) {
@@ -170,7 +182,7 @@ public class BeamRowToStorageApiProto {
       builder.setField(
           org.apache.beam.sdk.util.Preconditions.checkStateNotNull(
               descriptor.findFieldByName(StorageApiCDC.CHANGE_SQN_COLUMN)),
-          changeSequenceNum);
+          
org.apache.beam.sdk.util.Preconditions.checkStateNotNull(changeSequenceNum));
     }
     return builder.build();
   }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowMutation.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowMutation.java
index b0a6d460934..bd9d5d2a4ab 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowMutation.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowMutation.java
@@ -17,14 +17,16 @@
  */
 package org.apache.beam.sdk.io.gcp.bigquery;
 
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.auto.value.AutoValue;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.coders.VarLongCoder;
 
 /**
  * A convenience class for applying row updates to BigQuery using {@link
@@ -43,7 +45,12 @@ public abstract class RowMutation {
   }
 
   public static class RowMutationCoder extends AtomicCoder<RowMutation> {
+    private static final TableRowJsonCoder ROW_JSON_CODER = 
TableRowJsonCoder.of();
     private static final RowMutationCoder INSTANCE = new RowMutationCoder();
+    private static final VarIntCoder INT_CODER = VarIntCoder.of();
+    private static final StringUtf8Coder STRING_CODER = StringUtf8Coder.of();
+    private static final RowMutationInformation.MutationType[] MUTATION_TYPES =
+        RowMutationInformation.MutationType.values();
 
     public static RowMutationCoder of() {
       return INSTANCE;
@@ -51,19 +58,21 @@ public abstract class RowMutation {
 
     @Override
     public void encode(RowMutation value, OutputStream outStream) throws 
IOException {
-      TableRowJsonCoder.of().encode(value.getTableRow(), outStream);
-      VarIntCoder.of()
-          .encode(value.getMutationInformation().getMutationType().ordinal(), 
outStream);
-      
VarLongCoder.of().encode(value.getMutationInformation().getSequenceNumber(), 
outStream);
+      ROW_JSON_CODER.encode(value.getTableRow(), outStream);
+      RowMutationInformation mutationInformation = 
value.getMutationInformation();
+      INT_CODER.encode(mutationInformation.getMutationType().ordinal(), 
outStream);
+      STRING_CODER.encode(mutationInformation.getChangeSequenceNumber(), 
outStream);
     }
 
     @Override
     public RowMutation decode(InputStream inStream) throws IOException {
+      TableRow tableRow = ROW_JSON_CODER.decode(inStream);
+      int mutationTypeOrdinal = INT_CODER.decode(inStream);
+      checkState(mutationTypeOrdinal >= 0 && mutationTypeOrdinal < 
MUTATION_TYPES.length);
+      RowMutationInformation.MutationType mutationType = 
MUTATION_TYPES[mutationTypeOrdinal];
+      String changeSequenceNumber = STRING_CODER.decode(inStream);
       return RowMutation.of(
-          TableRowJsonCoder.of().decode(inStream),
-          RowMutationInformation.of(
-              
RowMutationInformation.MutationType.values()[VarIntCoder.of().decode(inStream)],
-              VarLongCoder.of().decode(inStream)));
+          tableRow, RowMutationInformation.of(mutationType, 
changeSequenceNumber));
     }
   }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowMutationInformation.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowMutationInformation.java
index cdbf9277d86..18905d149b9 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowMutationInformation.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowMutationInformation.java
@@ -17,7 +17,11 @@
  */
 package org.apache.beam.sdk.io.gcp.bigquery;
 
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+
 import com.google.auto.value.AutoValue;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 /**
  * This class indicates how to apply a row update to BigQuery. A sequence 
number must always be
@@ -37,10 +41,111 @@ public abstract class RowMutationInformation {
 
   public abstract MutationType getMutationType();
 
-  // The sequence number used
-  public abstract long getSequenceNumber();
+  /**
+   * The sequence number used to drive the order of applied row mutations. 
@deprecated {@link
+   * #getChangeSequenceNumber()} replaces this field as the BigQuery API 
instead supports the use of
+   * a string.
+   */
+  @Deprecated
+  @Nullable
+  public abstract Long getSequenceNumber();
+
+  /**
+   * The value supplied to the BigQuery {@code _CHANGE_SEQUENCE_NUMBER} 
pseudo-column. See {@link
+   * #of(MutationType, String)} for more details.
+   */
+  public abstract String getChangeSequenceNumber();
 
+  /**
+   * Instantiate {@link RowMutationInformation} with {@link MutationType} and 
the {@param
+   * sequenceNumber}. @deprecated - instantiates {@link 
RowMutationInformation} via {@link
+   * #of(MutationType, String)} forwarding the {@param sequenceNumber} value 
using {@link
+   * Long#toHexString(long)}. {@param sequenceNumber} values {@code < 0} will 
throw an error.
+   */
+  @Deprecated
   public static RowMutationInformation of(MutationType mutationType, long 
sequenceNumber) {
-    return new AutoValue_RowMutationInformation(mutationType, sequenceNumber);
+    checkArgument(sequenceNumber >= 0, "sequenceNumber must be non-negative");
+    return new AutoValue_RowMutationInformation(
+        mutationType, null, Long.toHexString(sequenceNumber));
+  }
+
+  /**
+   * Instantiate {@link RowMutationInformation} with {@link MutationType} and 
the {@param
+   * changeSequenceNumber}, which sets the BigQuery API {@code 
_CHANGE_SEQUENCE_NUMBER} pseudo
+   * column, enabling custom user-supplied ordering of {@link RowMutation}s.
+   *
+   * <p>Requirements for the {@param changeSequenceNumber}:
+   *
+   * <ul>
+   *   <li>fixed format {@code String} in hexadecimal format
+   *   <li><strong>do not use hexadecimals encoded from negative 
numbers</strong>
+   *   <li>each hexadecimal string separated into sections by forward slash: 
{@code /}
+   *   <li>up to four sections allowed
+   *   <li>each section is limited to {@code 16} hexadecimal characters: 
{@code 0-9}, {@code A-F},
+   *       or {@code a-f}
+   *   <li>The allowable range supported are values between {@code 0/0/0/0} 
and {@code
+   *       FFFFFFFFFFFFFFFF/FFFFFFFFFFFFFFFF/FFFFFFFFFFFFFFFF/FFFFFFFFFFFFFFFF}
+   * </ul>
+   *
+   * <p>Below are some {@param changeSequenceNumber} scenarios:
+   *
+   * <table>
+   *     <tr>
+   *         <th>Record #1: {@param changeSequenceNumber}</th>
+   *         <th>Record #2: {@param changeSequenceNumber}</th>
+   *         <th>BigQuery API compares as</th>
+   *     </tr>
+   *     <tr>
+   *         <td>{@code "B"}</td>
+   *         <td>{@code "ABC"}</td>
+   *         <td>Record #2 is considered the latest record: {@code 'ABC' > 'B' 
(i.e. '2748' > '11')}</td>
+   *     </tr>
+   *     <tr>
+   *         <td>{@code "FFF/B"}</td>
+   *         <td>{@code "FFF/ABC"}</td>
+   *         <td>Record #2 is considered the latest record: {@code "FFF/B" > 
"FFF/ABC" (i.e. "4095/2748" > "4095/11")}</td>
+   *     </tr>
+   *     <tr>
+   *         <td>{@code "BA/FFFFFFFF"}</td>
+   *         <td>{@code "ABC"}</td>
+   *         <td>Record #2 is considered the latest record: {@code "ABC" > 
"BA/FFFFFFFF" (i.e. "2748" > "186/4294967295")}</td>
+   *     </tr>
+   *     <tr>
+   *         <td>{@code "FFF/ABC"}</td>
+   *         <td>{@code "ABC"}</td>
+   *         <td>Record #1 is considered the latest record: {@code "FFF/ABC" > 
"ABC" (i.e. "4095/2748" > "2748")}</td>
+   *     </tr>
+   *     <tr>
+   *         <td>{@code "FFF"}</td>
+   *         <td>{@code "FFF"}</td>
+   *         <td>Record #1 and #2 change sequence number identical; BigQuery 
uses system ingestion time to take precedence over previously ingested 
records.</td>
+   *     </tr>
+   * </table>
+   *
+   * <p>Below are some code examples.
+   *
+   * <ul>
+   *   <li>{@code RowMutationInformation.of(UPSERT, "FFF/ABC")}
+   *   <li>Using Apache Commons {@link 
org.apache.commons.codec.binary.Hex#encodeHexString} (Java
+   *       17+ users can use {@code HexFormat}){@code 
RowMutationInformation.of(UPSERT,
+   *       Hex.encodeHexString("2024-04-30 11:19:44 
UTC".getBytes(StandardCharsets.UTF_8)))}
+   *   <li>Using {@link Long#toHexString}: {@code 
RowMutationInformation.of(DELETE,
+   *       Long.toHexString(123L))}
+   * </ul>
+   *
+   * See <a
+   * 
href="https://cloud.google.com/bigquery/docs/change-data-capture#manage_custom_ordering";>
+   * 
https://cloud.google.com/bigquery/docs/change-data-capture#manage_custom_ordering
 </a> for more
+   * details.
+   */
+  public static RowMutationInformation of(MutationType mutationType, String 
changeSequenceNumber) {
+    checkArgument(
+        !Strings.isNullOrEmpty(changeSequenceNumber), "changeSequenceNumber 
must not be empty");
+    checkArgument(
+        
StorageApiCDC.EXPECTED_SQN_PATTERN.asPredicate().test(changeSequenceNumber),
+        String.format(
+            "changeSequenceNumber: %s does not match expected pattern: %s",
+            changeSequenceNumber, 
StorageApiCDC.EXPECTED_SQN_PATTERN.pattern()));
+    return new AutoValue_RowMutationInformation(mutationType, null, 
changeSequenceNumber);
   }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiCDC.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiCDC.java
index 48cb9bb902c..f9240d83306 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiCDC.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiCDC.java
@@ -36,6 +36,7 @@ package org.apache.beam.sdk.io.gcp.bigquery;
  */
 
 import java.util.Set;
+import java.util.regex.Pattern;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
 
 /** Constants and variables for CDC support. */
@@ -43,4 +44,12 @@ public class StorageApiCDC {
   public static final String CHANGE_SQN_COLUMN = "_CHANGE_SEQUENCE_NUMBER";
   public static final String CHANGE_TYPE_COLUMN = "_CHANGE_TYPE";
   public static final Set<String> COLUMNS = 
ImmutableSet.of(CHANGE_TYPE_COLUMN, CHANGE_SQN_COLUMN);
+
+  /**
+   * Expected valid pattern for a {@link #CHANGE_SQN_COLUMN} value for use 
with BigQuery's {@code
+   * _CHANGE_SEQUENCE_NUMBER} format. See {@link
+   * RowMutationInformation#of(RowMutationInformation.MutationType, String)} 
for more details.
+   */
+  public static final Pattern EXPECTED_SQN_PATTERN =
+      Pattern.compile("^([0-9A-Fa-f]{1,16})(/([0-9A-Fa-f]{1,16})){0,3}$");
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsBeamRow.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsBeamRow.java
index 7821a7d9199..70ecb06d5b8 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsBeamRow.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsBeamRow.java
@@ -86,11 +86,11 @@ class StorageApiDynamicDestinationsBeamRow<T, DestinationT 
extends @NonNull Obje
     public StorageApiWritePayload toMessage(
         T element, @Nullable RowMutationInformation rowMutationInformation) 
throws Exception {
       String changeType = null;
-      long changeSequenceNum = -1;
+      String changeSequenceNum = null;
       Descriptor descriptorToUse = descriptor;
       if (rowMutationInformation != null) {
         changeType = rowMutationInformation.getMutationType().toString();
-        changeSequenceNum = rowMutationInformation.getSequenceNumber();
+        changeSequenceNum = rowMutationInformation.getChangeSequenceNumber();
         descriptorToUse = Preconditions.checkStateNotNull(cdcDescriptor);
       }
       Message msg =
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java
index 79141d73a39..c96bb4ce752 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java
@@ -81,14 +81,13 @@ class StorageApiDynamicDestinationsGenericRecord<T, 
DestinationT extends @NonNul
     @Override
     @SuppressWarnings("nullness")
     public StorageApiWritePayload toMessage(
-        T element, @javax.annotation.Nullable RowMutationInformation 
rowMutationInformation)
-        throws Exception {
+        T element, @Nullable RowMutationInformation rowMutationInformation) 
throws Exception {
       String changeType = null;
-      long changeSequenceNum = -1;
+      String changeSequenceNum = null;
       Descriptor descriptorToUse = descriptor;
       if (rowMutationInformation != null) {
         changeType = rowMutationInformation.getMutationType().toString();
-        changeSequenceNum = rowMutationInformation.getSequenceNumber();
+        changeSequenceNum = rowMutationInformation.getChangeSequenceNumber();
         descriptorToUse = cdcDescriptor;
       }
       Message msg =
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
index eb93d7c398f..264dac34473 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
@@ -161,11 +161,11 @@ public class StorageApiDynamicDestinationsTableRow<T, 
DestinationT extends @NonN
       TableRow tableRow = formatFunction.apply(element);
 
       String changeType = null;
-      long changeSequenceNum = -1;
+      String changeSequenceNum = null;
       Descriptor descriptorToUse = descriptor;
       if (rowMutationInformation != null) {
         changeType = rowMutationInformation.getMutationType().toString();
-        changeSequenceNum = rowMutationInformation.getSequenceNumber();
+        changeSequenceNum = rowMutationInformation.getChangeSequenceNumber();
         descriptorToUse = Preconditions.checkStateNotNull(cdcDescriptor);
       }
       // If autoSchemaUpdates==true, then we allow unknown values at this step 
and insert them into
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
index 4d714aaaf77..07457e72050 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
@@ -480,7 +480,7 @@ public class TableRowToStorageApiProto {
       boolean allowMissingRequiredFields,
       @Nullable TableRow unknownFields,
       @Nullable String changeType,
-      long changeSequenceNum)
+      @Nullable String changeSequenceNum)
       throws SchemaConversionException {
     DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
     for (final Map.Entry<String, Object> entry : map.entrySet()) {
@@ -542,7 +542,7 @@ public class TableRowToStorageApiProto {
       builder.setField(
           Preconditions.checkStateNotNull(
               descriptor.findFieldByName(StorageApiCDC.CHANGE_SQN_COLUMN)),
-          changeSequenceNum);
+          Preconditions.checkStateNotNull(changeSequenceNum));
     }
 
     try {
@@ -553,6 +553,32 @@ public class TableRowToStorageApiProto {
     }
   }
 
+  /**
+   * Forwards {@param changeSequenceNum} to {@link 
#messageFromTableRow(SchemaInformation,
+   * Descriptor, TableRow, boolean, boolean, TableRow, String, String)} via 
{@link
+   * Long#toHexString}.
+   */
+  public static DynamicMessage messageFromTableRow(
+      SchemaInformation schemaInformation,
+      Descriptor descriptor,
+      TableRow tableRow,
+      boolean ignoreUnknownValues,
+      boolean allowMissingRequiredFields,
+      final @Nullable TableRow unknownFields,
+      @Nullable String changeType,
+      long changeSequenceNum)
+      throws SchemaConversionException {
+    return messageFromTableRow(
+        schemaInformation,
+        descriptor,
+        tableRow,
+        ignoreUnknownValues,
+        allowMissingRequiredFields,
+        unknownFields,
+        changeType,
+        Long.toHexString(changeSequenceNum));
+  }
+
   /**
    * Given a BigQuery TableRow, returns a protocol-buffer message that can be 
used to write data
    * using the BigQuery Storage API.
@@ -566,7 +592,7 @@ public class TableRowToStorageApiProto {
       boolean allowMissingRequiredFields,
       final @Nullable TableRow unknownFields,
       @Nullable String changeType,
-      long changeSequenceNum)
+      @Nullable String changeSequenceNum)
       throws SchemaConversionException {
     @Nullable Object fValue = tableRow.get("f");
     if (fValue instanceof List) {
@@ -638,7 +664,7 @@ public class TableRowToStorageApiProto {
         builder.setField(
             Preconditions.checkStateNotNull(
                 descriptor.findFieldByName(StorageApiCDC.CHANGE_SQN_COLUMN)),
-            changeSequenceNum);
+            Preconditions.checkStateNotNull(changeSequenceNum));
       }
 
       // If there are unknown fields, copy them into the output.
@@ -743,7 +769,7 @@ public class TableRowToStorageApiProto {
       fieldDescriptorBuilder = FieldDescriptorProto.newBuilder();
       fieldDescriptorBuilder = 
fieldDescriptorBuilder.setName(StorageApiCDC.CHANGE_SQN_COLUMN);
       fieldDescriptorBuilder = fieldDescriptorBuilder.setNumber(i++);
-      fieldDescriptorBuilder = fieldDescriptorBuilder.setType(Type.TYPE_INT64);
+      fieldDescriptorBuilder = 
fieldDescriptorBuilder.setType(Type.TYPE_STRING);
       fieldDescriptorBuilder = 
fieldDescriptorBuilder.setLabel(Label.LABEL_OPTIONAL);
       descriptorBuilder.addField(fieldDescriptorBuilder.build());
     }
@@ -1013,7 +1039,7 @@ public class TableRowToStorageApiProto {
               allowMissingRequiredFields,
               getUnknownNestedFields.get(),
               null,
-              -1);
+              null);
         } else if (value instanceof AbstractMap) {
           // This will handle nested rows.
           AbstractMap<String, Object> map = ((AbstractMap<String, Object>) 
value);
@@ -1025,7 +1051,7 @@ public class TableRowToStorageApiProto {
               allowMissingRequiredFields,
               getUnknownNestedFields.get(),
               null,
-              -1);
+              null);
         }
         break;
       default:
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
index 1e746d7f96b..83c63d1c75f 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
@@ -661,7 +661,8 @@ public class FakeDatasetService implements DatasetService, 
WriteStreamService, S
             }
             fieldDescriptor = 
protoDescriptor.findFieldByName(StorageApiCDC.CHANGE_SQN_COLUMN);
             if (fieldDescriptor != null) {
-              changeSequenceNum = (long) msg.getField(fieldDescriptor);
+              String changeSequenceNumHex = (String) 
msg.getField(fieldDescriptor);
+              changeSequenceNum = Long.parseUnsignedLong(changeSequenceNumHex, 
16);
             }
             Stream.Entry.UpdateType insertType = 
Stream.Entry.UpdateType.INSERT;
             if (insertTypeStr != null) {
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java
index ac73a0122d8..3a4dcb02ebd 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java
@@ -488,7 +488,8 @@ public class AvroGenericRecordToStorageApiProtoTest {
         descriptor.getFields().stream()
             .collect(Collectors.toMap(Descriptors.FieldDescriptor::getName, 
Functions.identity()));
     assertEquals("UPDATE", 
msg.getField(fieldDescriptors.get(StorageApiCDC.CHANGE_TYPE_COLUMN)));
-    assertEquals(42L, 
msg.getField(fieldDescriptors.get(StorageApiCDC.CHANGE_SQN_COLUMN)));
+    assertEquals(
+        Long.toHexString(42L), 
msg.getField(fieldDescriptors.get(StorageApiCDC.CHANGE_SQN_COLUMN)));
   }
 
   @Test
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java
index af025d9f030..5c43688a6ef 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java
@@ -411,7 +411,9 @@ public class BeamRowToStorageApiProtoTest {
     assertBaseRecord(nestedMsg);
     assertEquals(
         "UPDATE", 
msg.getField(descriptor.findFieldByName(StorageApiCDC.CHANGE_TYPE_COLUMN)));
-    assertEquals(42L, 
msg.getField(descriptor.findFieldByName(StorageApiCDC.CHANGE_SQN_COLUMN)));
+    assertEquals(
+        Long.toHexString(42L),
+        
msg.getField(descriptor.findFieldByName(StorageApiCDC.CHANGE_SQN_COLUMN)));
   }
 
   @Test
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
index f42734af767..d303948fe44 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
@@ -3596,28 +3596,36 @@ public class BigQueryIOWriteTest implements 
Serializable {
         Lists.newArrayList(
             RowMutation.of(
                 new TableRow().set("key1", "foo0").set("key2", 
"bar0").set("value", "1"),
-                
RowMutationInformation.of(RowMutationInformation.MutationType.UPSERT, 0)),
+                RowMutationInformation.of(
+                    RowMutationInformation.MutationType.UPSERT, 
Long.toHexString(0L))),
             RowMutation.of(
                 new TableRow().set("key1", "foo1").set("key2", 
"bar1").set("value", "1"),
-                
RowMutationInformation.of(RowMutationInformation.MutationType.UPSERT, 0)),
+                RowMutationInformation.of(
+                    RowMutationInformation.MutationType.UPSERT, 
Long.toHexString(0L))),
             RowMutation.of(
                 new TableRow().set("key1", "foo0").set("key2", 
"bar0").set("value", "2"),
-                
RowMutationInformation.of(RowMutationInformation.MutationType.UPSERT, 1)),
+                RowMutationInformation.of(
+                    RowMutationInformation.MutationType.UPSERT, 
Long.toHexString(1L))),
             RowMutation.of(
                 new TableRow().set("key1", "foo1").set("key2", 
"bar1").set("value", "1"),
-                
RowMutationInformation.of(RowMutationInformation.MutationType.DELETE, 1)),
+                RowMutationInformation.of(
+                    RowMutationInformation.MutationType.DELETE, 
Long.toHexString(1L))),
             RowMutation.of(
                 new TableRow().set("key1", "foo3").set("key2", 
"bar3").set("value", "1"),
-                
RowMutationInformation.of(RowMutationInformation.MutationType.UPSERT, 0)),
+                RowMutationInformation.of(
+                    RowMutationInformation.MutationType.UPSERT, 
Long.toHexString(0L))),
             RowMutation.of(
                 new TableRow().set("key1", "foo1").set("key2", 
"bar1").set("value", "3"),
-                
RowMutationInformation.of(RowMutationInformation.MutationType.UPSERT, 2)),
+                RowMutationInformation.of(
+                    RowMutationInformation.MutationType.UPSERT, 
Long.toHexString(2L))),
             RowMutation.of(
                 new TableRow().set("key1", "foo4").set("key2", 
"bar4").set("value", "1"),
-                
RowMutationInformation.of(RowMutationInformation.MutationType.UPSERT, 0)),
+                RowMutationInformation.of(
+                    RowMutationInformation.MutationType.UPSERT, 
Long.toHexString(0L))),
             RowMutation.of(
                 new TableRow().set("key1", "foo4").set("key2", 
"bar4").set("value", "1"),
-                
RowMutationInformation.of(RowMutationInformation.MutationType.DELETE, 1)));
+                RowMutationInformation.of(
+                    RowMutationInformation.MutationType.DELETE, 
Long.toHexString(1L))));
 
     BigQueryIO.Write<RowMutation> write =
         BigQueryIO.applyRowMutations()
@@ -3655,7 +3663,7 @@ public class BigQueryIOWriteTest implements Serializable {
                     new TableFieldSchema().setName("key2").setType("STRING"),
                     new TableFieldSchema().setName("value").setType("STRING"),
                     new 
TableFieldSchema().setName("updateType").setType("STRING"),
-                    new TableFieldSchema().setName("sqn").setType("INT64")));
+                    new TableFieldSchema().setName("sqn").setType("STRING")));
 
     Table fakeTable = new Table();
     TableReference ref =
@@ -3675,7 +3683,7 @@ public class BigQueryIOWriteTest implements Serializable {
             .optionalString("key2")
             .optionalString("value")
             .optionalString("updateType")
-            .optionalLong("sqn")
+            .requiredString("sqn")
             .endRecord();
 
     List<GenericRecord> items =
@@ -3685,56 +3693,56 @@ public class BigQueryIOWriteTest implements 
Serializable {
                 .set("key2", "bar0")
                 .set("value", "1")
                 .set("updateType", "UPSERT")
-                .set("sqn", 0L)
+                .set("sqn", Long.toHexString(0L))
                 .build(),
             new GenericRecordBuilder(avroSchema)
                 .set("key1", "foo1")
                 .set("key2", "bar1")
                 .set("value", "1")
                 .set("updateType", "UPSERT")
-                .set("sqn", 0L)
+                .set("sqn", Long.toHexString(0L))
                 .build(),
             new GenericRecordBuilder(avroSchema)
                 .set("key1", "foo0")
                 .set("key2", "bar0")
                 .set("value", "2")
                 .set("updateType", "UPSERT")
-                .set("sqn", 1L)
+                .set("sqn", Long.toHexString(1L))
                 .build(),
             new GenericRecordBuilder(avroSchema)
                 .set("key1", "foo1")
                 .set("key2", "bar1")
                 .set("value", "1")
                 .set("updateType", "DELETE")
-                .set("sqn", 1L)
+                .set("sqn", Long.toHexString(1L))
                 .build(),
             new GenericRecordBuilder(avroSchema)
                 .set("key1", "foo3")
                 .set("key2", "bar3")
                 .set("value", "1")
                 .set("updateType", "UPSERT")
-                .set("sqn", 0L)
+                .set("sqn", Long.toHexString(0L))
                 .build(),
             new GenericRecordBuilder(avroSchema)
                 .set("key1", "foo1")
                 .set("key2", "bar1")
                 .set("value", "3")
                 .set("updateType", "UPSERT")
-                .set("sqn", 2L)
+                .set("sqn", Long.toHexString(2L))
                 .build(),
             new GenericRecordBuilder(avroSchema)
                 .set("key1", "foo4")
                 .set("key2", "bar4")
                 .set("value", "1")
                 .set("updateType", "UPSERT")
-                .set("sqn", 0L)
+                .set("sqn", Long.toHexString(0L))
                 .build(),
             new GenericRecordBuilder(avroSchema)
                 .set("key1", "foo4")
                 .set("key2", "bar4")
                 .set("value", "1")
                 .set("updateType", "DELETE")
-                .set("sqn", 1L)
+                .set("sqn", Long.toHexString(1L))
                 .build());
 
     BigQueryIO.Write<GenericRecord> write =
@@ -3744,10 +3752,12 @@ public class BigQueryIOWriteTest implements 
Serializable {
             .withSchema(tableSchema)
             .withMethod(Method.STORAGE_API_AT_LEAST_ONCE)
             .withRowMutationInformationFn(
-                r ->
-                    RowMutationInformation.of(
-                        
RowMutationInformation.MutationType.valueOf(r.get("updateType").toString()),
-                        (long) r.get("sqn")))
+                r -> {
+                  RowMutationInformation.MutationType mutationType =
+                      
RowMutationInformation.MutationType.valueOf(r.get("updateType").toString());
+                  String sqn = r.get("sqn").toString();
+                  return RowMutationInformation.of(mutationType, sqn);
+                })
             .withoutValidation()
             .withTestServices(fakeBqServices);
 
@@ -3761,19 +3771,19 @@ public class BigQueryIOWriteTest implements 
Serializable {
                 .set("key2", "bar0")
                 .set("value", "2")
                 .set("updatetype", "UPSERT")
-                .set("sqn", "1"),
+                .set("sqn", Long.toHexString(1)),
             new TableRow()
                 .set("key1", "foo1")
                 .set("key2", "bar1")
                 .set("value", "3")
                 .set("updatetype", "UPSERT")
-                .set("sqn", "2"),
+                .set("sqn", Long.toHexString(2)),
             new TableRow()
                 .set("key1", "foo3")
                 .set("key2", "bar3")
                 .set("value", "1")
                 .set("updatetype", "UPSERT")
-                .set("sqn", "0"));
+                .set("sqn", Long.toHexString(0)));
 
     assertThat(
         fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id"),
@@ -3793,7 +3803,7 @@ public class BigQueryIOWriteTest implements Serializable {
                     new TableFieldSchema().setName("key2").setType("STRING"),
                     new TableFieldSchema().setName("value").setType("STRING"),
                     new 
TableFieldSchema().setName("updateType").setType("STRING"),
-                    new TableFieldSchema().setName("sqn").setType("INT64")));
+                    new TableFieldSchema().setName("sqn").setType("STRING")));
 
     Table fakeTable = new Table();
     TableReference ref =
@@ -3812,7 +3822,7 @@ public class BigQueryIOWriteTest implements Serializable {
             .addNullableStringField("key2")
             .addNullableStringField("value")
             .addNullableStringField("updateType")
-            .addNullableInt64Field("sqn")
+            .addNullableStringField("sqn")
             .build();
 
     List<Row> items =
@@ -3822,56 +3832,56 @@ public class BigQueryIOWriteTest implements 
Serializable {
                 .withFieldValue("key2", "bar0")
                 .withFieldValue("value", "1")
                 .withFieldValue("updateType", "UPSERT")
-                .withFieldValue("sqn", 0L)
+                .withFieldValue("sqn", Long.toHexString(0L))
                 .build(),
             Row.withSchema(beamSchema)
                 .withFieldValue("key1", "foo1")
                 .withFieldValue("key2", "bar1")
                 .withFieldValue("value", "1")
                 .withFieldValue("updateType", "UPSERT")
-                .withFieldValue("sqn", 0L)
+                .withFieldValue("sqn", Long.toHexString(0L))
                 .build(),
             Row.withSchema(beamSchema)
                 .withFieldValue("key1", "foo0")
                 .withFieldValue("key2", "bar0")
                 .withFieldValue("value", "2")
                 .withFieldValue("updateType", "UPSERT")
-                .withFieldValue("sqn", 1L)
+                .withFieldValue("sqn", Long.toHexString(1L))
                 .build(),
             Row.withSchema(beamSchema)
                 .withFieldValue("key1", "foo1")
                 .withFieldValue("key2", "bar1")
                 .withFieldValue("value", "1")
                 .withFieldValue("updateType", "DELETE")
-                .withFieldValue("sqn", 1L)
+                .withFieldValue("sqn", Long.toHexString(1L))
                 .build(),
             Row.withSchema(beamSchema)
                 .withFieldValue("key1", "foo3")
                 .withFieldValue("key2", "bar3")
                 .withFieldValue("value", "1")
                 .withFieldValue("updateType", "UPSERT")
-                .withFieldValue("sqn", 0L)
+                .withFieldValue("sqn", Long.toHexString(0L))
                 .build(),
             Row.withSchema(beamSchema)
                 .withFieldValue("key1", "foo1")
                 .withFieldValue("key2", "bar1")
                 .withFieldValue("value", "3")
                 .withFieldValue("updateType", "UPSERT")
-                .withFieldValue("sqn", 2L)
+                .withFieldValue("sqn", Long.toHexString(2L))
                 .build(),
             Row.withSchema(beamSchema)
                 .withFieldValue("key1", "foo4")
                 .withFieldValue("key2", "bar4")
                 .withFieldValue("value", "1")
                 .withFieldValue("updateType", "UPSERT")
-                .withFieldValue("sqn", 0L)
+                .withFieldValue("sqn", Long.toHexString(0L))
                 .build(),
             Row.withSchema(beamSchema)
                 .withFieldValue("key1", "foo4")
                 .withFieldValue("key2", "bar4")
                 .withFieldValue("value", "1")
                 .withFieldValue("updateType", "DELETE")
-                .withFieldValue("sqn", 1L)
+                .withFieldValue("sqn", Long.toHexString(1L))
                 .build());
 
     BigQueryIO.Write<Row> write =
@@ -3885,7 +3895,7 @@ public class BigQueryIOWriteTest implements Serializable {
                 r ->
                     RowMutationInformation.of(
                         
RowMutationInformation.MutationType.valueOf(r.getString("updateType")),
-                        r.getInt64("sqn")))
+                        r.getString("sqn")))
             .withoutValidation()
             .withTestServices(fakeBqServices);
 
@@ -3899,19 +3909,19 @@ public class BigQueryIOWriteTest implements 
Serializable {
                 .set("key2", "bar0")
                 .set("value", "2")
                 .set("updatetype", "UPSERT")
-                .set("sqn", "1"),
+                .set("sqn", Long.toHexString(1)),
             new TableRow()
                 .set("key1", "foo1")
                 .set("key2", "bar1")
                 .set("value", "3")
                 .set("updatetype", "UPSERT")
-                .set("sqn", "2"),
+                .set("sqn", Long.toHexString(2)),
             new TableRow()
                 .set("key1", "foo3")
                 .set("key2", "bar3")
                 .set("value", "1")
                 .set("updatetype", "UPSERT")
-                .set("sqn", "0"));
+                .set("sqn", Long.toHexString(0)));
 
     assertThat(
         fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id"),
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/RowMutationInformationTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/RowMutationInformationTest.java
new file mode 100644
index 00000000000..88d410a106a
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/RowMutationInformationTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class RowMutationInformationTest {
+  @Test
+  public void givenLong_SQN_EQ_Zero_encodesAndInstantiates() {
+    long sqn = 0L;
+    RowMutationInformation got =
+        RowMutationInformation.of(RowMutationInformation.MutationType.UPSERT, 
sqn);
+    assertNotNull(got.getChangeSequenceNumber());
+    assertEquals("0", got.getChangeSequenceNumber());
+  }
+
+  @Test
+  public void givenLong_SQN_GT_Zero_encodesAndInstantiates() {
+    long sqn = 6L;
+    RowMutationInformation got =
+        RowMutationInformation.of(RowMutationInformation.MutationType.UPSERT, 
sqn);
+    assertNotNull(got.getChangeSequenceNumber());
+    assertEquals("6", got.getChangeSequenceNumber());
+  }
+
+  @Test
+  public void givenLong_SQN_EQ_Max_encodesAndInstantiates() {
+    long sqn = Long.MAX_VALUE;
+    RowMutationInformation got =
+        RowMutationInformation.of(RowMutationInformation.MutationType.UPSERT, 
sqn);
+    assertNotNull(got.getChangeSequenceNumber());
+    assertEquals("7fffffffffffffff", got.getChangeSequenceNumber());
+  }
+
+  @Test
+  public void givenLong_SQL_LT_Zero_throws() {
+    IllegalArgumentException error =
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> 
RowMutationInformation.of(RowMutationInformation.MutationType.UPSERT, -1L));
+    assertEquals("sequenceNumber must be non-negative", error.getMessage());
+  }
+
+  @Test
+  public void givenSimpleHex_encodesAndInstantiates() {
+    String sqn = "FFF/ABC/012/AAA";
+    RowMutationInformation got =
+        RowMutationInformation.of(RowMutationInformation.MutationType.UPSERT, 
sqn);
+    assertNotNull(got.getChangeSequenceNumber());
+    assertEquals(sqn, got.getChangeSequenceNumber());
+  }
+
+  @Test
+  public void givenTooManySegments_throws() {
+    IllegalArgumentException error =
+        assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                
RowMutationInformation.of(RowMutationInformation.MutationType.UPSERT, 
"0/0/0/0/0"));
+    assertEquals(
+        "changeSequenceNumber: 0/0/0/0/0 does not match expected pattern: 
^([0-9A-Fa-f]{1,16})(/([0-9A-Fa-f]{1,16})){0,3}$",
+        error.getMessage());
+  }
+
+  @Test
+  public void givenEmptyString_throws() {
+    IllegalArgumentException error =
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> 
RowMutationInformation.of(RowMutationInformation.MutationType.UPSERT, ""));
+    assertEquals("changeSequenceNumber must not be empty", error.getMessage());
+  }
+
+  @Test
+  public void givenEmptySegment_throws() {
+    IllegalArgumentException error =
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> 
RowMutationInformation.of(RowMutationInformation.MutationType.UPSERT, 
"0/1//3"));
+    assertEquals(
+        "changeSequenceNumber: 0/1//3 does not match expected pattern: 
^([0-9A-Fa-f]{1,16})(/([0-9A-Fa-f]{1,16})){0,3}$",
+        error.getMessage());
+  }
+
+  @Test
+  public void givenSingleSegmentTooLarge_throws() {
+    IllegalArgumentException error =
+        assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                RowMutationInformation.of(
+                    RowMutationInformation.MutationType.UPSERT, 
"12345678901234567"));
+    assertEquals(
+        "changeSequenceNumber: 12345678901234567 does not match expected 
pattern: ^([0-9A-Fa-f]{1,16})(/([0-9A-Fa-f]{1,16})){0,3}$",
+        error.getMessage());
+  }
+
+  @Test
+  public void givenAddlSegmentTooLarge_throws() {
+    IllegalArgumentException error =
+        assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                RowMutationInformation.of(
+                    RowMutationInformation.MutationType.UPSERT, 
"0/12345678901234567"));
+    assertEquals(
+        "changeSequenceNumber: 0/12345678901234567 does not match expected 
pattern: ^([0-9A-Fa-f]{1,16})(/([0-9A-Fa-f]{1,16})){0,3}$",
+        error.getMessage());
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkRowUpdateIT.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkRowUpdateIT.java
index f8cc797a87c..10e6afed6dc 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkRowUpdateIT.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkRowUpdateIT.java
@@ -70,7 +70,7 @@ public class StorageApiSinkRowUpdateIT {
   }
 
   @Test
-  public void testCdc() throws Exception {
+  public void testCdcUsingLongSeqNum() throws Exception {
     TableSchema tableSchema =
         new TableSchema()
             .setFields(
@@ -130,6 +130,67 @@ public class StorageApiSinkRowUpdateIT {
     assertRowsWritten(tableSpec, expected);
   }
 
+  @Test
+  public void testCdcUsingHexSequenceNum() throws Exception {
+    TableSchema tableSchema =
+        new TableSchema()
+            .setFields(
+                ImmutableList.of(
+                    new TableFieldSchema().setName("key1").setType("STRING"),
+                    new TableFieldSchema().setName("key2").setType("STRING"),
+                    new 
TableFieldSchema().setName("value").setType("STRING")));
+
+    List<RowMutation> items =
+        Lists.newArrayList(
+            RowMutation.of(
+                new TableRow().set("key1", "foo0").set("key2", 
"bar0").set("value", "1"),
+                
RowMutationInformation.of(RowMutationInformation.MutationType.UPSERT, "AAA/0")),
+            RowMutation.of(
+                new TableRow().set("key1", "foo1").set("key2", 
"bar1").set("value", "1"),
+                
RowMutationInformation.of(RowMutationInformation.MutationType.UPSERT, "AAA/0")),
+            RowMutation.of(
+                new TableRow().set("key1", "foo0").set("key2", 
"bar0").set("value", "2"),
+                
RowMutationInformation.of(RowMutationInformation.MutationType.UPSERT, "AAA/1")),
+            RowMutation.of(
+                new TableRow().set("key1", "foo1").set("key2", 
"bar1").set("value", "1"),
+                
RowMutationInformation.of(RowMutationInformation.MutationType.DELETE, "AAA/1")),
+            RowMutation.of(
+                new TableRow().set("key1", "foo3").set("key2", 
"bar3").set("value", "1"),
+                
RowMutationInformation.of(RowMutationInformation.MutationType.UPSERT, "AAA/0")),
+            RowMutation.of(
+                new TableRow().set("key1", "foo1").set("key2", 
"bar1").set("value", "3"),
+                
RowMutationInformation.of(RowMutationInformation.MutationType.UPSERT, "AAA/2")),
+            RowMutation.of(
+                new TableRow().set("key1", "foo4").set("key2", 
"bar4").set("value", "1"),
+                
RowMutationInformation.of(RowMutationInformation.MutationType.UPSERT, "AAA/0")),
+            RowMutation.of(
+                new TableRow().set("key1", "foo4").set("key2", 
"bar4").set("value", "1"),
+                
RowMutationInformation.of(RowMutationInformation.MutationType.DELETE, 
"AAA/1")));
+
+    List<String> primaryKey = Lists.newArrayList("key1", "key2");
+    String tableSpec = getTablespec();
+    Pipeline p = Pipeline.create();
+    p.apply("Create rows", Create.of(items))
+        .apply(
+            "Apply updates",
+            BigQueryIO.applyRowMutations()
+                .to(tableSpec)
+                .withSchema(tableSchema)
+                .withPrimaryKey(primaryKey)
+                .withClustering(new Clustering().setFields(primaryKey))
+                .withMethod(BigQueryIO.Write.Method.STORAGE_API_AT_LEAST_ONCE)
+                
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
+
+    p.run();
+
+    List<TableRow> expected =
+        Lists.newArrayList(
+            new TableRow().set("key1", "foo0").set("key2", 
"bar0").set("value", "2"),
+            new TableRow().set("key1", "foo1").set("key2", 
"bar1").set("value", "3"),
+            new TableRow().set("key1", "foo3").set("key2", 
"bar3").set("value", "1"));
+    assertRowsWritten(tableSpec, expected);
+  }
+
   private void assertRowsWritten(String tableSpec, Iterable<TableRow> expected)
       throws IOException, InterruptedException {
     List<TableRow> queryResponse =
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
index aae1c2096cc..847ede9df36 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
@@ -1638,6 +1638,7 @@ public class TableRowToStorageApiProtoTest {
     assertBaseRecord((DynamicMessage) 
msg.getField(fieldDescriptors.get("nestedvaluenof1")), false);
     assertBaseRecord((DynamicMessage) 
msg.getField(fieldDescriptors.get("nestedvaluenof2")), false);
     assertEquals("UPDATE", 
msg.getField(fieldDescriptors.get(StorageApiCDC.CHANGE_TYPE_COLUMN)));
-    assertEquals(42L, 
msg.getField(fieldDescriptors.get(StorageApiCDC.CHANGE_SQN_COLUMN)));
+    assertEquals(
+        Long.toHexString(42L), 
msg.getField(fieldDescriptors.get(StorageApiCDC.CHANGE_SQN_COLUMN)));
   }
 }

Reply via email to