thiagotnunes commented on code in PR #24390:
URL: https://github.com/apache/beam/pull/24390#discussion_r1035363580
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/NameGenerator.java:
##########
@@ -25,23 +25,30 @@
*/
public class NameGenerator {
- private static final String PARTITION_METADATA_TABLE_NAME_FORMAT =
- "CDC_Partitions_Metadata_%s_%s";
+ private static final String PARTITION_METADATA_TABLE_NAME_FORMAT =
"Metadata_%s_%s";
+ private static final int MAX_POSTGRES_TABLE_NAME_LENGTH = 63;
Review Comment:
Since we only have one length, maybe drop the `POSTGRES`?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDao.java:
##########
@@ -98,11 +100,13 @@ public class PartitionMetadataAdminDao {
DatabaseAdminClient databaseAdminClient,
String instanceId,
String databaseId,
- String tableName) {
+ String tableName,
+ Dialect metadataDatabaseDialect) {
Review Comment:
nit: since this class is named `PartitionMetadataAdminDao` I think we can
name this field / variable just `dialect`
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/NameGenerator.java:
##########
@@ -25,23 +25,30 @@
*/
public class NameGenerator {
- private static final String PARTITION_METADATA_TABLE_NAME_FORMAT =
- "CDC_Partitions_Metadata_%s_%s";
+ private static final String PARTITION_METADATA_TABLE_NAME_FORMAT =
"Metadata_%s_%s";
+ private static final int MAX_POSTGRES_TABLE_NAME_LENGTH = 63;
/**
* Generates an unique name for the partition metadata table in the form of
{@code
- * "CDC_Partitions_Metadata_<databaseId>_<uuid>"}.
+ * "Metadata_<databaseId>_<uuid>"}.
*
* @param databaseId The database id where the table will be created
* @return the unique generated name of the partition metadata table
*/
public static String generatePartitionMetadataTableName(String databaseId) {
// Maximum Spanner table name length is 128 characters.
- // There are 25 characters in the name format.
+ // There are 11 characters in the name format.
// Maximum Spanner database ID length is 30 characters.
// UUID always generates a String with 36 characters.
- // 128 - (25 + 30 + 36) = 37 characters short of the limit
- return String.format(PARTITION_METADATA_TABLE_NAME_FORMAT, databaseId,
UUID.randomUUID())
- .replaceAll("-", "_");
+ // For GoogleSQL, 128 - (11 + 30 + 36) = 51 characters short of the limit
Review Comment:
Does this comment make sense still, now that we consolidated the size?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java:
##########
@@ -83,30 +90,35 @@ public class ChangeStreamRecordMapper {
private static final String CHILD_PARTITIONS_COLUMN = "child_partitions";
private static final String PARENT_PARTITION_TOKENS_COLUMN =
"parent_partition_tokens";
private static final String TOKEN_COLUMN = "token";
+ private final Dialect spannerChangeStreamDatabaseDialect;
- ChangeStreamRecordMapper() {}
+ ChangeStreamRecordMapper(Dialect spannerChangeStreamDatabaseDialect) {
Review Comment:
nit: rename variable to `dialect`
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java:
##########
@@ -225,18 +246,50 @@ private Stream<ChangeStreamRecord> toChangeStreamRecord(
Stream.concat(dataChangeRecords, heartbeatRecords),
childPartitionsRecords);
}
+ ChangeStreamRecord toChangeStreamRecordJson(
+ PartitionMetadata partition, String row, ChangeStreamResultSetMetadata
resultSetMetadata) {
+ Value.Builder valueBuilder = Value.newBuilder();
+ try {
+ JsonFormat.parser().ignoringUnknownFields().merge(row, valueBuilder);
Review Comment:
Do we need to create a new parser per record, or could we move this into the
constructor of this class?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java:
##########
@@ -53,16 +54,19 @@ public class PartitionMetadataDao {
private final String metadataTableName;
private final DatabaseClient databaseClient;
+ private final Dialect metadataDatabaseDialect;
/**
* Constructs a partition metadata dao object given the generated name of
the tables.
*
* @param metadataTableName the name of the partition metadata table
* @param databaseClient the {@link DatabaseClient} to perform queries
*/
- PartitionMetadataDao(String metadataTableName, DatabaseClient
databaseClient) {
+ PartitionMetadataDao(
+ String metadataTableName, DatabaseClient databaseClient, Dialect
metadataDatabaseDialect) {
this.metadataTableName = metadataTableName;
this.databaseClient = databaseClient;
+ this.metadataDatabaseDialect = metadataDatabaseDialect;
Review Comment:
nit: since this class is the `PartitionMetadataDao` I think we can name this
variable / field just `dialect`.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/mapper/ChangeStreamRecordMapper.java:
##########
@@ -260,6 +312,65 @@ private DataChangeRecord toDataChangeRecord(
changeStreamRecordMetadataFrom(partition, commitTimestamp,
resultSetMetadata));
}
+ private DataChangeRecord toDataChangeRecordJson(
+ PartitionMetadata partition, Value row, ChangeStreamResultSetMetadata
resultSetMetadata) {
+ Value dataChangeRecordValue =
+
Optional.ofNullable(row.getStructValue().getFieldsMap().get(DATA_CHANGE_RECORD_COLUMN))
+ .orElseThrow(IllegalArgumentException::new);
+ Map<String, Value> valueMap =
dataChangeRecordValue.getStructValue().getFieldsMap();
+ final String commitTimestamp =
+ Optional.ofNullable(valueMap.get(COMMIT_TIMESTAMP_COLUMN))
+ .orElseThrow(IllegalArgumentException::new)
+ .getStringValue();
+ return new DataChangeRecord(
+ partition.getPartitionToken(),
+ Timestamp.parseTimestamp(commitTimestamp),
+ Optional.ofNullable(valueMap.get(SERVER_TRANSACTION_ID_COLUMN))
+ .orElseThrow(IllegalArgumentException::new)
Review Comment:
Thanks
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamDao.java:
##########
@@ -50,11 +52,13 @@ public class ChangeStreamDao {
String changeStreamName,
DatabaseClient databaseClient,
RpcPriority rpcPriority,
- String jobName) {
+ String jobName,
+ Dialect spannerChangeStreamDatabaseDialect) {
Review Comment:
nit: since this is the `ChangeStreamDao` I think we can name this variable /
field just `dialect`
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDao.java:
##########
@@ -190,4 +237,8 @@ public void deletePartitionMetadataTable() {
throw SpannerExceptionFactory.propagateInterrupt(e);
}
}
+
+ private boolean isPostgres() {
Review Comment:
nit: should we have a similar method on the `ChangeStreamDao`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]