This is an automated email from the ASF dual-hosted git repository.
damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new df8bead5945 Refactor RowMutationInformation to use string type (#31323)
df8bead5945 is described below
commit df8bead5945c801854f07dce6e708b1241c94696
Author: Damon <[email protected]>
AuthorDate: Wed May 29 10:49:24 2024 -0700
Refactor RowMutationInformation to use string type (#31323)
* Refactor RowMutationInformation to use string type
* Remove unnecessary test
* Add javadoc
* Add segment too large test cases
* Add hex based test cases to integration test
---
.../beam/sdk/io/gcp/bigquery/AppendClientInfo.java | 2 +-
.../AvroGenericRecordToStorageApiProto.java | 17 ++-
.../io/gcp/bigquery/BeamRowToStorageApiProto.java | 16 ++-
.../beam/sdk/io/gcp/bigquery/RowMutation.java | 27 +++--
.../io/gcp/bigquery/RowMutationInformation.java | 111 ++++++++++++++++-
.../beam/sdk/io/gcp/bigquery/StorageApiCDC.java | 9 ++
.../StorageApiDynamicDestinationsBeamRow.java | 4 +-
...StorageApiDynamicDestinationsGenericRecord.java | 7 +-
.../StorageApiDynamicDestinationsTableRow.java | 4 +-
.../io/gcp/bigquery/TableRowToStorageApiProto.java | 40 +++++--
.../sdk/io/gcp/testing/FakeDatasetService.java | 3 +-
.../AvroGenericRecordToStorageApiProtoTest.java | 3 +-
.../gcp/bigquery/BeamRowToStorageApiProtoTest.java | 4 +-
.../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java | 88 ++++++++------
.../gcp/bigquery/RowMutationInformationTest.java | 132 +++++++++++++++++++++
.../io/gcp/bigquery/StorageApiSinkRowUpdateIT.java | 63 +++++++++-
.../bigquery/TableRowToStorageApiProtoTest.java | 3 +-
17 files changed, 457 insertions(+), 76 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
index 3094af5855e..211027c12b0 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
@@ -145,7 +145,7 @@ abstract class AppendClientInfo {
true,
null,
null,
- -1);
+ null);
return msg.toByteString();
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java
index 7141869b228..519f9391db6 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java
@@ -162,6 +162,19 @@ public class AvroGenericRecordToStorageApiProto {
return builder.build();
}
+ /**
+ * Forwards {@param changeSequenceNum} to {@link
#messageFromGenericRecord(Descriptor,
+ * GenericRecord, String, String)} via {@link Long#toHexString}.
+ */
+ public static DynamicMessage messageFromGenericRecord(
+ Descriptor descriptor,
+ GenericRecord record,
+ @Nullable String changeType,
+ long changeSequenceNum) {
+ return messageFromGenericRecord(
+ descriptor, record, changeType, Long.toHexString(changeSequenceNum));
+ }
+
/**
* Given an Avro {@link GenericRecord} object, returns a protocol-buffer
message that can be used
* to write data using the BigQuery Storage streaming API.
@@ -174,7 +187,7 @@ public class AvroGenericRecordToStorageApiProto {
Descriptor descriptor,
GenericRecord record,
@Nullable String changeType,
- long changeSequenceNum) {
+ @Nullable String changeSequenceNum) {
Schema schema = record.getSchema();
DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
for (Schema.Field field : schema.getFields()) {
@@ -195,7 +208,7 @@ public class AvroGenericRecordToStorageApiProto {
builder.setField(
org.apache.beam.sdk.util.Preconditions.checkStateNotNull(
descriptor.findFieldByName(StorageApiCDC.CHANGE_SQN_COLUMN)),
- changeSequenceNum);
+
org.apache.beam.sdk.util.Preconditions.checkStateNotNull(changeSequenceNum));
}
return builder.build();
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
index d91ddd6843c..4275125ef16 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
@@ -143,12 +143,24 @@ public class BeamRowToStorageApiProto {
((EnumerationType)
logicalType).toString((EnumerationType.Value) value))
.build();
+ /**
+ * Forwards (@param changeSequenceNum) to {@link
#messageFromBeamRow(Descriptor, Row, String,
+ * String)} via {@link Long#toHexString}.
+ */
+ public static DynamicMessage messageFromBeamRow(
+ Descriptor descriptor, Row row, @Nullable String changeType, long
changeSequenceNum) {
+ return messageFromBeamRow(descriptor, row, changeType,
Long.toHexString(changeSequenceNum));
+ }
+
/**
* Given a Beam {@link Row} object, returns a protocol-buffer message that
can be used to write
* data using the BigQuery Storage streaming API.
*/
public static DynamicMessage messageFromBeamRow(
- Descriptor descriptor, Row row, @Nullable String changeType, long
changeSequenceNum) {
+ Descriptor descriptor,
+ Row row,
+ @Nullable String changeType,
+ @Nullable String changeSequenceNum) {
Schema beamSchema = row.getSchema();
DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
for (int i = 0; i < row.getFieldCount(); ++i) {
@@ -170,7 +182,7 @@ public class BeamRowToStorageApiProto {
builder.setField(
org.apache.beam.sdk.util.Preconditions.checkStateNotNull(
descriptor.findFieldByName(StorageApiCDC.CHANGE_SQN_COLUMN)),
- changeSequenceNum);
+
org.apache.beam.sdk.util.Preconditions.checkStateNotNull(changeSequenceNum));
}
return builder.build();
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowMutation.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowMutation.java
index b0a6d460934..bd9d5d2a4ab 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowMutation.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowMutation.java
@@ -17,14 +17,16 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
import com.google.api.services.bigquery.model.TableRow;
import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.coders.VarLongCoder;
/**
* A convenience class for applying row updates to BigQuery using {@link
@@ -43,7 +45,12 @@ public abstract class RowMutation {
}
public static class RowMutationCoder extends AtomicCoder<RowMutation> {
+ private static final TableRowJsonCoder ROW_JSON_CODER =
TableRowJsonCoder.of();
private static final RowMutationCoder INSTANCE = new RowMutationCoder();
+ private static final VarIntCoder INT_CODER = VarIntCoder.of();
+ private static final StringUtf8Coder STRING_CODER = StringUtf8Coder.of();
+ private static final RowMutationInformation.MutationType[] MUTATION_TYPES =
+ RowMutationInformation.MutationType.values();
public static RowMutationCoder of() {
return INSTANCE;
@@ -51,19 +58,21 @@ public abstract class RowMutation {
@Override
public void encode(RowMutation value, OutputStream outStream) throws
IOException {
- TableRowJsonCoder.of().encode(value.getTableRow(), outStream);
- VarIntCoder.of()
- .encode(value.getMutationInformation().getMutationType().ordinal(),
outStream);
-
VarLongCoder.of().encode(value.getMutationInformation().getSequenceNumber(),
outStream);
+ ROW_JSON_CODER.encode(value.getTableRow(), outStream);
+ RowMutationInformation mutationInformation =
value.getMutationInformation();
+ INT_CODER.encode(mutationInformation.getMutationType().ordinal(),
outStream);
+ STRING_CODER.encode(mutationInformation.getChangeSequenceNumber(),
outStream);
}
@Override
public RowMutation decode(InputStream inStream) throws IOException {
+ TableRow tableRow = ROW_JSON_CODER.decode(inStream);
+ int mutationTypeOrdinal = INT_CODER.decode(inStream);
+ checkState(mutationTypeOrdinal >= 0 && mutationTypeOrdinal <
MUTATION_TYPES.length);
+ RowMutationInformation.MutationType mutationType =
MUTATION_TYPES[mutationTypeOrdinal];
+ String changeSequenceNumber = STRING_CODER.decode(inStream);
return RowMutation.of(
- TableRowJsonCoder.of().decode(inStream),
- RowMutationInformation.of(
-
RowMutationInformation.MutationType.values()[VarIntCoder.of().decode(inStream)],
- VarLongCoder.of().decode(inStream)));
+ tableRow, RowMutationInformation.of(mutationType,
changeSequenceNumber));
}
}
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowMutationInformation.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowMutationInformation.java
index cdbf9277d86..18905d149b9 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowMutationInformation.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowMutationInformation.java
@@ -17,7 +17,11 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+
import com.google.auto.value.AutoValue;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+import org.checkerframework.checker.nullness.qual.Nullable;
/**
* This class indicates how to apply a row update to BigQuery. A sequence
number must always be
@@ -37,10 +41,111 @@ public abstract class RowMutationInformation {
public abstract MutationType getMutationType();
- // The sequence number used
- public abstract long getSequenceNumber();
+ /**
+ * The sequence number used to drive the order of applied row mutations.
@deprecated {@link
+ * #getChangeSequenceNumber()} replaces this field as the BigQuery API
instead supports the use of
+ * a string.
+ */
+ @Deprecated
+ @Nullable
+ public abstract Long getSequenceNumber();
+
+ /**
+ * The value supplied to the BigQuery {@code _CHANGE_SEQUENCE_NUMBER}
pseudo-column. See {@link
+ * #of(MutationType, String)} for more details.
+ */
+ public abstract String getChangeSequenceNumber();
+ /**
+ * Instantiate {@link RowMutationInformation} with {@link MutationType} and
the {@param
+ * sequenceNumber}. @deprecated - instantiates {@link
RowMutationInformation} via {@link
+ * #of(MutationType, String)} forwarding the {@param sequenceNumber} value
using {@link
+ * Long#toHexString(long)}. {@param sequenceNumber} values {@code < 0} will
throw an error.
+ */
+ @Deprecated
public static RowMutationInformation of(MutationType mutationType, long
sequenceNumber) {
- return new AutoValue_RowMutationInformation(mutationType, sequenceNumber);
+ checkArgument(sequenceNumber >= 0, "sequenceNumber must be non-negative");
+ return new AutoValue_RowMutationInformation(
+ mutationType, null, Long.toHexString(sequenceNumber));
+ }
+
+ /**
+ * Instantiate {@link RowMutationInformation} with {@link MutationType} and
the {@param
+ * changeSequenceNumber}, which sets the BigQuery API {@code
_CHANGE_SEQUENCE_NUMBER} pseudo
+ * column, enabling custom user-supplied ordering of {@link RowMutation}s.
+ *
+ * <p>Requirements for the {@param changeSequenceNumber}:
+ *
+ * <ul>
+ * <li>fixed format {@code String} in hexadecimal format
+ * <li><strong>do not use hexadecimals encoded from negative
numbers</strong>
+ * <li>each hexadecimal string separated into sections by forward slash:
{@code /}
+ * <li>up to four sections allowed
+ * <li>each section is limited to {@code 16} hexadecimal characters:
{@code 0-9}, {@code A-F},
+ * or {@code a-f}
+ * <li>The allowable range supported are values between {@code 0/0/0/0}
and {@code
+ * FFFFFFFFFFFFFFFF/FFFFFFFFFFFFFFFF/FFFFFFFFFFFFFFFF/FFFFFFFFFFFFFFFF}
+ * </ul>
+ *
+ * <p>Below are some {@param changeSequenceNumber} scenarios:
+ *
+ * <table>
+ * <tr>
+ * <th>Record #1: {@param changeSequenceNumber}</th>
+ * <th>Record #2: {@param changeSequenceNumber}</th>
+ * <th>BigQuery API compares as</th>
+ * </tr>
+ * <tr>
+ * <td>{@code "B"}</td>
+ * <td>{@code "ABC"}</td>
+ * <td>Record #2 is considered the latest record: {@code 'ABC' > 'B'
(i.e. '2748' > '11')}</td>
+ * </tr>
+ * <tr>
+ * <td>{@code "FFF/B"}</td>
+ * <td>{@code "FFF/ABC"}</td>
+ * <td>Record #2 is considered the latest record: {@code "FFF/B" >
"FFF/ABC" (i.e. "4095/2748" > "4095/11")}</td>
+ * </tr>
+ * <tr>
+ * <td>{@code "BA/FFFFFFFF"}</td>
+ * <td>{@code "ABC"}</td>
+ * <td>Record #2 is considered the latest record: {@code "ABC" >
"BA/FFFFFFFF" (i.e. "2748" > "186/4294967295")}</td>
+ * </tr>
+ * <tr>
+ * <td>{@code "FFF/ABC"}</td>
+ * <td>{@code "ABC"}</td>
+ * <td>Record #1 is considered the latest record: {@code "FFF/ABC" >
"ABC" (i.e. "4095/2748" > "2748")}</td>
+ * </tr>
+ * <tr>
+ * <td>{@code "FFF"}</td>
+ * <td>{@code "FFF"}</td>
+ * <td>Record #1 and #2 change sequence number identical; BigQuery
uses system ingestion time to take precedence over previously ingested
records.</td>
+ * </tr>
+ * </table>
+ *
+ * <p>Below are some code examples.
+ *
+ * <ul>
+ * <li>{@code RowMutationInformation.of(UPSERT, "FFF/ABC")}
+ * <li>Using Apache Commons {@link
org.apache.commons.codec.binary.Hex#encodeHexString} (Java
+ * 17+ users can use {@code HexFormat}){@code
RowMutationInformation.of(UPSERT,
+ * Hex.encodeHexString("2024-04-30 11:19:44
UTC".getBytes(StandardCharsets.UTF_8)))}
+ * <li>Using {@link Long#toHexString}: {@code
RowMutationInformation.of(DELETE,
+ * Long.toHexString(123L))}
+ * </ul>
+ *
+ * See <a
+ *
href="https://cloud.google.com/bigquery/docs/change-data-capture#manage_custom_ordering">
+ *
https://cloud.google.com/bigquery/docs/change-data-capture#manage_custom_ordering
</a> for more
+ * details.
+ */
+ public static RowMutationInformation of(MutationType mutationType, String
changeSequenceNumber) {
+ checkArgument(
+ !Strings.isNullOrEmpty(changeSequenceNumber), "changeSequenceNumber
must not be empty");
+ checkArgument(
+
StorageApiCDC.EXPECTED_SQN_PATTERN.asPredicate().test(changeSequenceNumber),
+ String.format(
+ "changeSequenceNumber: %s does not match expected pattern: %s",
+ changeSequenceNumber,
StorageApiCDC.EXPECTED_SQN_PATTERN.pattern()));
+ return new AutoValue_RowMutationInformation(mutationType, null,
changeSequenceNumber);
}
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiCDC.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiCDC.java
index 48cb9bb902c..f9240d83306 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiCDC.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiCDC.java
@@ -36,6 +36,7 @@ package org.apache.beam.sdk.io.gcp.bigquery;
*/
import java.util.Set;
+import java.util.regex.Pattern;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
/** Constants and variables for CDC support. */
@@ -43,4 +44,12 @@ public class StorageApiCDC {
public static final String CHANGE_SQN_COLUMN = "_CHANGE_SEQUENCE_NUMBER";
public static final String CHANGE_TYPE_COLUMN = "_CHANGE_TYPE";
public static final Set<String> COLUMNS =
ImmutableSet.of(CHANGE_TYPE_COLUMN, CHANGE_SQN_COLUMN);
+
+ /**
+ * Expected valid pattern for a {@link #CHANGE_SQN_COLUMN} value for use
with BigQuery's {@code
+ * _CHANGE_SEQUENCE_NUMBER} format. See {@link
+ * RowMutationInformation#of(RowMutationInformation.MutationType, String)}
for more details.
+ */
+ public static final Pattern EXPECTED_SQN_PATTERN =
+ Pattern.compile("^([0-9A-Fa-f]{1,16})(/([0-9A-Fa-f]{1,16})){0,3}$");
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsBeamRow.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsBeamRow.java
index 7821a7d9199..70ecb06d5b8 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsBeamRow.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsBeamRow.java
@@ -86,11 +86,11 @@ class StorageApiDynamicDestinationsBeamRow<T, DestinationT
extends @NonNull Obje
public StorageApiWritePayload toMessage(
T element, @Nullable RowMutationInformation rowMutationInformation)
throws Exception {
String changeType = null;
- long changeSequenceNum = -1;
+ String changeSequenceNum = null;
Descriptor descriptorToUse = descriptor;
if (rowMutationInformation != null) {
changeType = rowMutationInformation.getMutationType().toString();
- changeSequenceNum = rowMutationInformation.getSequenceNumber();
+ changeSequenceNum = rowMutationInformation.getChangeSequenceNumber();
descriptorToUse = Preconditions.checkStateNotNull(cdcDescriptor);
}
Message msg =
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java
index 79141d73a39..c96bb4ce752 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java
@@ -81,14 +81,13 @@ class StorageApiDynamicDestinationsGenericRecord<T,
DestinationT extends @NonNul
@Override
@SuppressWarnings("nullness")
public StorageApiWritePayload toMessage(
- T element, @javax.annotation.Nullable RowMutationInformation
rowMutationInformation)
- throws Exception {
+ T element, @Nullable RowMutationInformation rowMutationInformation)
throws Exception {
String changeType = null;
- long changeSequenceNum = -1;
+ String changeSequenceNum = null;
Descriptor descriptorToUse = descriptor;
if (rowMutationInformation != null) {
changeType = rowMutationInformation.getMutationType().toString();
- changeSequenceNum = rowMutationInformation.getSequenceNumber();
+ changeSequenceNum = rowMutationInformation.getChangeSequenceNumber();
descriptorToUse = cdcDescriptor;
}
Message msg =
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
index eb93d7c398f..264dac34473 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
@@ -161,11 +161,11 @@ public class StorageApiDynamicDestinationsTableRow<T,
DestinationT extends @NonN
TableRow tableRow = formatFunction.apply(element);
String changeType = null;
- long changeSequenceNum = -1;
+ String changeSequenceNum = null;
Descriptor descriptorToUse = descriptor;
if (rowMutationInformation != null) {
changeType = rowMutationInformation.getMutationType().toString();
- changeSequenceNum = rowMutationInformation.getSequenceNumber();
+ changeSequenceNum = rowMutationInformation.getChangeSequenceNumber();
descriptorToUse = Preconditions.checkStateNotNull(cdcDescriptor);
}
// If autoSchemaUpdates==true, then we allow unknown values at this step
and insert them into
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
index 4d714aaaf77..07457e72050 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
@@ -480,7 +480,7 @@ public class TableRowToStorageApiProto {
boolean allowMissingRequiredFields,
@Nullable TableRow unknownFields,
@Nullable String changeType,
- long changeSequenceNum)
+ @Nullable String changeSequenceNum)
throws SchemaConversionException {
DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
for (final Map.Entry<String, Object> entry : map.entrySet()) {
@@ -542,7 +542,7 @@ public class TableRowToStorageApiProto {
builder.setField(
Preconditions.checkStateNotNull(
descriptor.findFieldByName(StorageApiCDC.CHANGE_SQN_COLUMN)),
- changeSequenceNum);
+ Preconditions.checkStateNotNull(changeSequenceNum));
}
try {
@@ -553,6 +553,32 @@ public class TableRowToStorageApiProto {
}
}
+ /**
+ * Forwards {@param changeSequenceNum} to {@link
#messageFromTableRow(SchemaInformation,
+ * Descriptor, TableRow, boolean, boolean, TableRow, String, String)} via
{@link
+ * Long#toHexString}.
+ */
+ public static DynamicMessage messageFromTableRow(
+ SchemaInformation schemaInformation,
+ Descriptor descriptor,
+ TableRow tableRow,
+ boolean ignoreUnknownValues,
+ boolean allowMissingRequiredFields,
+ final @Nullable TableRow unknownFields,
+ @Nullable String changeType,
+ long changeSequenceNum)
+ throws SchemaConversionException {
+ return messageFromTableRow(
+ schemaInformation,
+ descriptor,
+ tableRow,
+ ignoreUnknownValues,
+ allowMissingRequiredFields,
+ unknownFields,
+ changeType,
+ Long.toHexString(changeSequenceNum));
+ }
+
/**
* Given a BigQuery TableRow, returns a protocol-buffer message that can be
used to write data
* using the BigQuery Storage API.
@@ -566,7 +592,7 @@ public class TableRowToStorageApiProto {
boolean allowMissingRequiredFields,
final @Nullable TableRow unknownFields,
@Nullable String changeType,
- long changeSequenceNum)
+ @Nullable String changeSequenceNum)
throws SchemaConversionException {
@Nullable Object fValue = tableRow.get("f");
if (fValue instanceof List) {
@@ -638,7 +664,7 @@ public class TableRowToStorageApiProto {
builder.setField(
Preconditions.checkStateNotNull(
descriptor.findFieldByName(StorageApiCDC.CHANGE_SQN_COLUMN)),
- changeSequenceNum);
+ Preconditions.checkStateNotNull(changeSequenceNum));
}
// If there are unknown fields, copy them into the output.
@@ -743,7 +769,7 @@ public class TableRowToStorageApiProto {
fieldDescriptorBuilder = FieldDescriptorProto.newBuilder();
fieldDescriptorBuilder =
fieldDescriptorBuilder.setName(StorageApiCDC.CHANGE_SQN_COLUMN);
fieldDescriptorBuilder = fieldDescriptorBuilder.setNumber(i++);
- fieldDescriptorBuilder = fieldDescriptorBuilder.setType(Type.TYPE_INT64);
+ fieldDescriptorBuilder =
fieldDescriptorBuilder.setType(Type.TYPE_STRING);
fieldDescriptorBuilder =
fieldDescriptorBuilder.setLabel(Label.LABEL_OPTIONAL);
descriptorBuilder.addField(fieldDescriptorBuilder.build());
}
@@ -1013,7 +1039,7 @@ public class TableRowToStorageApiProto {
allowMissingRequiredFields,
getUnknownNestedFields.get(),
null,
- -1);
+ null);
} else if (value instanceof AbstractMap) {
// This will handle nested rows.
AbstractMap<String, Object> map = ((AbstractMap<String, Object>)
value);
@@ -1025,7 +1051,7 @@ public class TableRowToStorageApiProto {
allowMissingRequiredFields,
getUnknownNestedFields.get(),
null,
- -1);
+ null);
}
break;
default:
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
index 1e746d7f96b..83c63d1c75f 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
@@ -661,7 +661,8 @@ public class FakeDatasetService implements DatasetService,
WriteStreamService, S
}
fieldDescriptor =
protoDescriptor.findFieldByName(StorageApiCDC.CHANGE_SQN_COLUMN);
if (fieldDescriptor != null) {
- changeSequenceNum = (long) msg.getField(fieldDescriptor);
+ String changeSequenceNumHex = (String)
msg.getField(fieldDescriptor);
+ changeSequenceNum = Long.parseUnsignedLong(changeSequenceNumHex,
16);
}
Stream.Entry.UpdateType insertType =
Stream.Entry.UpdateType.INSERT;
if (insertTypeStr != null) {
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java
index ac73a0122d8..3a4dcb02ebd 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java
@@ -488,7 +488,8 @@ public class AvroGenericRecordToStorageApiProtoTest {
descriptor.getFields().stream()
.collect(Collectors.toMap(Descriptors.FieldDescriptor::getName,
Functions.identity()));
assertEquals("UPDATE",
msg.getField(fieldDescriptors.get(StorageApiCDC.CHANGE_TYPE_COLUMN)));
- assertEquals(42L,
msg.getField(fieldDescriptors.get(StorageApiCDC.CHANGE_SQN_COLUMN)));
+ assertEquals(
+ Long.toHexString(42L),
msg.getField(fieldDescriptors.get(StorageApiCDC.CHANGE_SQN_COLUMN)));
}
@Test
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java
index af025d9f030..5c43688a6ef 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java
@@ -411,7 +411,9 @@ public class BeamRowToStorageApiProtoTest {
assertBaseRecord(nestedMsg);
assertEquals(
"UPDATE",
msg.getField(descriptor.findFieldByName(StorageApiCDC.CHANGE_TYPE_COLUMN)));
- assertEquals(42L,
msg.getField(descriptor.findFieldByName(StorageApiCDC.CHANGE_SQN_COLUMN)));
+ assertEquals(
+ Long.toHexString(42L),
+
msg.getField(descriptor.findFieldByName(StorageApiCDC.CHANGE_SQN_COLUMN)));
}
@Test
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
index f42734af767..d303948fe44 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
@@ -3596,28 +3596,36 @@ public class BigQueryIOWriteTest implements
Serializable {
Lists.newArrayList(
RowMutation.of(
new TableRow().set("key1", "foo0").set("key2",
"bar0").set("value", "1"),
-
RowMutationInformation.of(RowMutationInformation.MutationType.UPSERT, 0)),
+ RowMutationInformation.of(
+ RowMutationInformation.MutationType.UPSERT,
Long.toHexString(0L))),
RowMutation.of(
new TableRow().set("key1", "foo1").set("key2",
"bar1").set("value", "1"),
-
RowMutationInformation.of(RowMutationInformation.MutationType.UPSERT, 0)),
+ RowMutationInformation.of(
+ RowMutationInformation.MutationType.UPSERT,
Long.toHexString(0L))),
RowMutation.of(
new TableRow().set("key1", "foo0").set("key2",
"bar0").set("value", "2"),
-
RowMutationInformation.of(RowMutationInformation.MutationType.UPSERT, 1)),
+ RowMutationInformation.of(
+ RowMutationInformation.MutationType.UPSERT,
Long.toHexString(1L))),
RowMutation.of(
new TableRow().set("key1", "foo1").set("key2",
"bar1").set("value", "1"),
-
RowMutationInformation.of(RowMutationInformation.MutationType.DELETE, 1)),
+ RowMutationInformation.of(
+ RowMutationInformation.MutationType.DELETE,
Long.toHexString(1L))),
RowMutation.of(
new TableRow().set("key1", "foo3").set("key2",
"bar3").set("value", "1"),
-
RowMutationInformation.of(RowMutationInformation.MutationType.UPSERT, 0)),
+ RowMutationInformation.of(
+ RowMutationInformation.MutationType.UPSERT,
Long.toHexString(0L))),
RowMutation.of(
new TableRow().set("key1", "foo1").set("key2",
"bar1").set("value", "3"),
-
RowMutationInformation.of(RowMutationInformation.MutationType.UPSERT, 2)),
+ RowMutationInformation.of(
+ RowMutationInformation.MutationType.UPSERT,
Long.toHexString(2L))),
RowMutation.of(
new TableRow().set("key1", "foo4").set("key2",
"bar4").set("value", "1"),
-
RowMutationInformation.of(RowMutationInformation.MutationType.UPSERT, 0)),
+ RowMutationInformation.of(
+ RowMutationInformation.MutationType.UPSERT,
Long.toHexString(0L))),
RowMutation.of(
new TableRow().set("key1", "foo4").set("key2",
"bar4").set("value", "1"),
-
RowMutationInformation.of(RowMutationInformation.MutationType.DELETE, 1)));
+ RowMutationInformation.of(
+ RowMutationInformation.MutationType.DELETE,
Long.toHexString(1L))));
BigQueryIO.Write<RowMutation> write =
BigQueryIO.applyRowMutations()
@@ -3655,7 +3663,7 @@ public class BigQueryIOWriteTest implements Serializable {
new TableFieldSchema().setName("key2").setType("STRING"),
new TableFieldSchema().setName("value").setType("STRING"),
new
TableFieldSchema().setName("updateType").setType("STRING"),
- new TableFieldSchema().setName("sqn").setType("INT64")));
+ new TableFieldSchema().setName("sqn").setType("STRING")));
Table fakeTable = new Table();
TableReference ref =
@@ -3675,7 +3683,7 @@ public class BigQueryIOWriteTest implements Serializable {
.optionalString("key2")
.optionalString("value")
.optionalString("updateType")
- .optionalLong("sqn")
+ .requiredString("sqn")
.endRecord();
List<GenericRecord> items =
@@ -3685,56 +3693,56 @@ public class BigQueryIOWriteTest implements
Serializable {
.set("key2", "bar0")
.set("value", "1")
.set("updateType", "UPSERT")
- .set("sqn", 0L)
+ .set("sqn", Long.toHexString(0L))
.build(),
new GenericRecordBuilder(avroSchema)
.set("key1", "foo1")
.set("key2", "bar1")
.set("value", "1")
.set("updateType", "UPSERT")
- .set("sqn", 0L)
+ .set("sqn", Long.toHexString(0L))
.build(),
new GenericRecordBuilder(avroSchema)
.set("key1", "foo0")
.set("key2", "bar0")
.set("value", "2")
.set("updateType", "UPSERT")
- .set("sqn", 1L)
+ .set("sqn", Long.toHexString(1L))
.build(),
new GenericRecordBuilder(avroSchema)
.set("key1", "foo1")
.set("key2", "bar1")
.set("value", "1")
.set("updateType", "DELETE")
- .set("sqn", 1L)
+ .set("sqn", Long.toHexString(1L))
.build(),
new GenericRecordBuilder(avroSchema)
.set("key1", "foo3")
.set("key2", "bar3")
.set("value", "1")
.set("updateType", "UPSERT")
- .set("sqn", 0L)
+ .set("sqn", Long.toHexString(0L))
.build(),
new GenericRecordBuilder(avroSchema)
.set("key1", "foo1")
.set("key2", "bar1")
.set("value", "3")
.set("updateType", "UPSERT")
- .set("sqn", 2L)
+ .set("sqn", Long.toHexString(2L))
.build(),
new GenericRecordBuilder(avroSchema)
.set("key1", "foo4")
.set("key2", "bar4")
.set("value", "1")
.set("updateType", "UPSERT")
- .set("sqn", 0L)
+ .set("sqn", Long.toHexString(0L))
.build(),
new GenericRecordBuilder(avroSchema)
.set("key1", "foo4")
.set("key2", "bar4")
.set("value", "1")
.set("updateType", "DELETE")
- .set("sqn", 1L)
+ .set("sqn", Long.toHexString(1L))
.build());
BigQueryIO.Write<GenericRecord> write =
@@ -3744,10 +3752,12 @@ public class BigQueryIOWriteTest implements
Serializable {
.withSchema(tableSchema)
.withMethod(Method.STORAGE_API_AT_LEAST_ONCE)
.withRowMutationInformationFn(
- r ->
- RowMutationInformation.of(
-
RowMutationInformation.MutationType.valueOf(r.get("updateType").toString()),
- (long) r.get("sqn")))
+ r -> {
+ RowMutationInformation.MutationType mutationType =
+
RowMutationInformation.MutationType.valueOf(r.get("updateType").toString());
+ String sqn = r.get("sqn").toString();
+ return RowMutationInformation.of(mutationType, sqn);
+ })
.withoutValidation()
.withTestServices(fakeBqServices);
@@ -3761,19 +3771,19 @@ public class BigQueryIOWriteTest implements
Serializable {
.set("key2", "bar0")
.set("value", "2")
.set("updatetype", "UPSERT")
- .set("sqn", "1"),
+ .set("sqn", Long.toHexString(1)),
new TableRow()
.set("key1", "foo1")
.set("key2", "bar1")
.set("value", "3")
.set("updatetype", "UPSERT")
- .set("sqn", "2"),
+ .set("sqn", Long.toHexString(2)),
new TableRow()
.set("key1", "foo3")
.set("key2", "bar3")
.set("value", "1")
.set("updatetype", "UPSERT")
- .set("sqn", "0"));
+ .set("sqn", Long.toHexString(0)));
assertThat(
fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id"),
@@ -3793,7 +3803,7 @@ public class BigQueryIOWriteTest implements Serializable {
new TableFieldSchema().setName("key2").setType("STRING"),
new TableFieldSchema().setName("value").setType("STRING"),
new
TableFieldSchema().setName("updateType").setType("STRING"),
- new TableFieldSchema().setName("sqn").setType("INT64")));
+ new TableFieldSchema().setName("sqn").setType("STRING")));
Table fakeTable = new Table();
TableReference ref =
@@ -3812,7 +3822,7 @@ public class BigQueryIOWriteTest implements Serializable {
.addNullableStringField("key2")
.addNullableStringField("value")
.addNullableStringField("updateType")
- .addNullableInt64Field("sqn")
+ .addNullableStringField("sqn")
.build();
List<Row> items =
@@ -3822,56 +3832,56 @@ public class BigQueryIOWriteTest implements
Serializable {
.withFieldValue("key2", "bar0")
.withFieldValue("value", "1")
.withFieldValue("updateType", "UPSERT")
- .withFieldValue("sqn", 0L)
+ .withFieldValue("sqn", Long.toHexString(0L))
.build(),
Row.withSchema(beamSchema)
.withFieldValue("key1", "foo1")
.withFieldValue("key2", "bar1")
.withFieldValue("value", "1")
.withFieldValue("updateType", "UPSERT")
- .withFieldValue("sqn", 0L)
+ .withFieldValue("sqn", Long.toHexString(0L))
.build(),
Row.withSchema(beamSchema)
.withFieldValue("key1", "foo0")
.withFieldValue("key2", "bar0")
.withFieldValue("value", "2")
.withFieldValue("updateType", "UPSERT")
- .withFieldValue("sqn", 1L)
+ .withFieldValue("sqn", Long.toHexString(1L))
.build(),
Row.withSchema(beamSchema)
.withFieldValue("key1", "foo1")
.withFieldValue("key2", "bar1")
.withFieldValue("value", "1")
.withFieldValue("updateType", "DELETE")
- .withFieldValue("sqn", 1L)
+ .withFieldValue("sqn", Long.toHexString(1L))
.build(),
Row.withSchema(beamSchema)
.withFieldValue("key1", "foo3")
.withFieldValue("key2", "bar3")
.withFieldValue("value", "1")
.withFieldValue("updateType", "UPSERT")
- .withFieldValue("sqn", 0L)
+ .withFieldValue("sqn", Long.toHexString(0L))
.build(),
Row.withSchema(beamSchema)
.withFieldValue("key1", "foo1")
.withFieldValue("key2", "bar1")
.withFieldValue("value", "3")
.withFieldValue("updateType", "UPSERT")
- .withFieldValue("sqn", 2L)
+ .withFieldValue("sqn", Long.toHexString(2L))
.build(),
Row.withSchema(beamSchema)
.withFieldValue("key1", "foo4")
.withFieldValue("key2", "bar4")
.withFieldValue("value", "1")
.withFieldValue("updateType", "UPSERT")
- .withFieldValue("sqn", 0L)
+ .withFieldValue("sqn", Long.toHexString(0L))
.build(),
Row.withSchema(beamSchema)
.withFieldValue("key1", "foo4")
.withFieldValue("key2", "bar4")
.withFieldValue("value", "1")
.withFieldValue("updateType", "DELETE")
- .withFieldValue("sqn", 1L)
+ .withFieldValue("sqn", Long.toHexString(1L))
.build());
BigQueryIO.Write<Row> write =
@@ -3885,7 +3895,7 @@ public class BigQueryIOWriteTest implements Serializable {
r ->
RowMutationInformation.of(
RowMutationInformation.MutationType.valueOf(r.getString("updateType")),
- r.getInt64("sqn")))
+ r.getString("sqn")))
.withoutValidation()
.withTestServices(fakeBqServices);
@@ -3899,19 +3909,19 @@ public class BigQueryIOWriteTest implements
Serializable {
.set("key2", "bar0")
.set("value", "2")
.set("updatetype", "UPSERT")
- .set("sqn", "1"),
+ .set("sqn", Long.toHexString(1)),
new TableRow()
.set("key1", "foo1")
.set("key2", "bar1")
.set("value", "3")
.set("updatetype", "UPSERT")
- .set("sqn", "2"),
+ .set("sqn", Long.toHexString(2)),
new TableRow()
.set("key1", "foo3")
.set("key2", "bar3")
.set("value", "1")
.set("updatetype", "UPSERT")
- .set("sqn", "0"));
+ .set("sqn", Long.toHexString(0)));
assertThat(
fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id"),
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/RowMutationInformationTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/RowMutationInformationTest.java
new file mode 100644
index 00000000000..88d410a106a
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/RowMutationInformationTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class RowMutationInformationTest {
+ @Test
+ public void givenLong_SQN_EQ_Zero_encodesAndInstantiates() {
+ long sqn = 0L;
+ RowMutationInformation got =
+ RowMutationInformation.of(RowMutationInformation.MutationType.UPSERT,
sqn);
+ assertNotNull(got.getChangeSequenceNumber());
+ assertEquals("0", got.getChangeSequenceNumber());
+ }
+
+ @Test
+ public void givenLong_SQN_GT_Zero_encodesAndInstantiates() {
+ long sqn = 6L;
+ RowMutationInformation got =
+ RowMutationInformation.of(RowMutationInformation.MutationType.UPSERT,
sqn);
+ assertNotNull(got.getChangeSequenceNumber());
+ assertEquals("6", got.getChangeSequenceNumber());
+ }
+
+ @Test
+ public void givenLong_SQN_EQ_Max_encodesAndInstantiates() {
+ long sqn = Long.MAX_VALUE;
+ RowMutationInformation got =
+ RowMutationInformation.of(RowMutationInformation.MutationType.UPSERT,
sqn);
+ assertNotNull(got.getChangeSequenceNumber());
+ assertEquals("7fffffffffffffff", got.getChangeSequenceNumber());
+ }
+
+ @Test
+ public void givenLong_SQL_LT_Zero_throws() {
+ IllegalArgumentException error =
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
RowMutationInformation.of(RowMutationInformation.MutationType.UPSERT, -1L));
+ assertEquals("sequenceNumber must be non-negative", error.getMessage());
+ }
+
+ @Test
+ public void givenSimpleHex_encodesAndInstantiates() {
+ String sqn = "FFF/ABC/012/AAA";
+ RowMutationInformation got =
+ RowMutationInformation.of(RowMutationInformation.MutationType.UPSERT,
sqn);
+ assertNotNull(got.getChangeSequenceNumber());
+ assertEquals(sqn, got.getChangeSequenceNumber());
+ }
+
+ @Test
+ public void givenTooManySegments_throws() {
+ IllegalArgumentException error =
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+
RowMutationInformation.of(RowMutationInformation.MutationType.UPSERT,
"0/0/0/0/0"));
+ assertEquals(
+ "changeSequenceNumber: 0/0/0/0/0 does not match expected pattern:
^([0-9A-Fa-f]{1,16})(/([0-9A-Fa-f]{1,16})){0,3}$",
+ error.getMessage());
+ }
+
+ @Test
+ public void givenEmptyString_throws() {
+ IllegalArgumentException error =
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
RowMutationInformation.of(RowMutationInformation.MutationType.UPSERT, ""));
+ assertEquals("changeSequenceNumber must not be empty", error.getMessage());
+ }
+
+ @Test
+ public void givenEmptySegment_throws() {
+ IllegalArgumentException error =
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
RowMutationInformation.of(RowMutationInformation.MutationType.UPSERT,
"0/1//3"));
+ assertEquals(
+ "changeSequenceNumber: 0/1//3 does not match expected pattern:
^([0-9A-Fa-f]{1,16})(/([0-9A-Fa-f]{1,16})){0,3}$",
+ error.getMessage());
+ }
+
+ @Test
+ public void givenSingleSegmentTooLarge_throws() {
+ IllegalArgumentException error =
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ RowMutationInformation.of(
+ RowMutationInformation.MutationType.UPSERT,
"12345678901234567"));
+ assertEquals(
+ "changeSequenceNumber: 12345678901234567 does not match expected
pattern: ^([0-9A-Fa-f]{1,16})(/([0-9A-Fa-f]{1,16})){0,3}$",
+ error.getMessage());
+ }
+
+ @Test
+ public void givenAddlSegmentTooLarge_throws() {
+ IllegalArgumentException error =
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ RowMutationInformation.of(
+ RowMutationInformation.MutationType.UPSERT,
"0/12345678901234567"));
+ assertEquals(
+ "changeSequenceNumber: 0/12345678901234567 does not match expected
pattern: ^([0-9A-Fa-f]{1,16})(/([0-9A-Fa-f]{1,16})){0,3}$",
+ error.getMessage());
+ }
+}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkRowUpdateIT.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkRowUpdateIT.java
index f8cc797a87c..10e6afed6dc 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkRowUpdateIT.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkRowUpdateIT.java
@@ -70,7 +70,7 @@ public class StorageApiSinkRowUpdateIT {
}
@Test
- public void testCdc() throws Exception {
+ public void testCdcUsingLongSeqNum() throws Exception {
TableSchema tableSchema =
new TableSchema()
.setFields(
@@ -130,6 +130,67 @@ public class StorageApiSinkRowUpdateIT {
assertRowsWritten(tableSpec, expected);
}
+ @Test
+ public void testCdcUsingHexSequenceNum() throws Exception {
+ TableSchema tableSchema =
+ new TableSchema()
+ .setFields(
+ ImmutableList.of(
+ new TableFieldSchema().setName("key1").setType("STRING"),
+ new TableFieldSchema().setName("key2").setType("STRING"),
+ new
TableFieldSchema().setName("value").setType("STRING")));
+
+ List<RowMutation> items =
+ Lists.newArrayList(
+ RowMutation.of(
+ new TableRow().set("key1", "foo0").set("key2",
"bar0").set("value", "1"),
+
RowMutationInformation.of(RowMutationInformation.MutationType.UPSERT, "AAA/0")),
+ RowMutation.of(
+ new TableRow().set("key1", "foo1").set("key2",
"bar1").set("value", "1"),
+
RowMutationInformation.of(RowMutationInformation.MutationType.UPSERT, "AAA/0")),
+ RowMutation.of(
+ new TableRow().set("key1", "foo0").set("key2",
"bar0").set("value", "2"),
+
RowMutationInformation.of(RowMutationInformation.MutationType.UPSERT, "AAA/1")),
+ RowMutation.of(
+ new TableRow().set("key1", "foo1").set("key2",
"bar1").set("value", "1"),
+
RowMutationInformation.of(RowMutationInformation.MutationType.DELETE, "AAA/1")),
+ RowMutation.of(
+ new TableRow().set("key1", "foo3").set("key2",
"bar3").set("value", "1"),
+
RowMutationInformation.of(RowMutationInformation.MutationType.UPSERT, "AAA/0")),
+ RowMutation.of(
+ new TableRow().set("key1", "foo1").set("key2",
"bar1").set("value", "3"),
+
RowMutationInformation.of(RowMutationInformation.MutationType.UPSERT, "AAA/2")),
+ RowMutation.of(
+ new TableRow().set("key1", "foo4").set("key2",
"bar4").set("value", "1"),
+
RowMutationInformation.of(RowMutationInformation.MutationType.UPSERT, "AAA/0")),
+ RowMutation.of(
+ new TableRow().set("key1", "foo4").set("key2",
"bar4").set("value", "1"),
+
RowMutationInformation.of(RowMutationInformation.MutationType.DELETE,
"AAA/1")));
+
+ List<String> primaryKey = Lists.newArrayList("key1", "key2");
+ String tableSpec = getTablespec();
+ Pipeline p = Pipeline.create();
+ p.apply("Create rows", Create.of(items))
+ .apply(
+ "Apply updates",
+ BigQueryIO.applyRowMutations()
+ .to(tableSpec)
+ .withSchema(tableSchema)
+ .withPrimaryKey(primaryKey)
+ .withClustering(new Clustering().setFields(primaryKey))
+ .withMethod(BigQueryIO.Write.Method.STORAGE_API_AT_LEAST_ONCE)
+
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
+
+ p.run();
+
+ List<TableRow> expected =
+ Lists.newArrayList(
+ new TableRow().set("key1", "foo0").set("key2",
"bar0").set("value", "2"),
+ new TableRow().set("key1", "foo1").set("key2",
"bar1").set("value", "3"),
+ new TableRow().set("key1", "foo3").set("key2",
"bar3").set("value", "1"));
+ assertRowsWritten(tableSpec, expected);
+ }
+
private void assertRowsWritten(String tableSpec, Iterable<TableRow> expected)
throws IOException, InterruptedException {
List<TableRow> queryResponse =
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
index aae1c2096cc..847ede9df36 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
@@ -1638,6 +1638,7 @@ public class TableRowToStorageApiProtoTest {
assertBaseRecord((DynamicMessage)
msg.getField(fieldDescriptors.get("nestedvaluenof1")), false);
assertBaseRecord((DynamicMessage)
msg.getField(fieldDescriptors.get("nestedvaluenof2")), false);
assertEquals("UPDATE",
msg.getField(fieldDescriptors.get(StorageApiCDC.CHANGE_TYPE_COLUMN)));
- assertEquals(42L,
msg.getField(fieldDescriptors.get(StorageApiCDC.CHANGE_SQN_COLUMN)));
+ assertEquals(
+ Long.toHexString(42L),
msg.getField(fieldDescriptors.get(StorageApiCDC.CHANGE_SQN_COLUMN)));
}
}