jackdingilian commented on code in PR #31376:
URL: https://github.com/apache/beam/pull/31376#discussion_r1613798994
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java:
##########
@@ -2284,25 +2286,77 @@ public ReadChangeStream
withBacklogReplicationAdjustment(Duration adjustment) {
return toBuilder().setBacklogReplicationAdjustment(adjustment).build();
}
+ /**
Review Comment:
We should note that this disables metadata table creation / update and that
will need to be done explicitly when this is enabled (and the table isn't
created / up-to-date)
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java:
##########
@@ -2284,25 +2286,77 @@ public ReadChangeStream
withBacklogReplicationAdjustment(Duration adjustment) {
return toBuilder().setBacklogReplicationAdjustment(adjustment).build();
}
+ /**
+ * Disables validation that the table being read and the metadata table
exists, and that the app
+ * profile used is single cluster and single row transcation enabled. Set
this option if the
+ * caller does not have additional Bigtable permissions to validate the
configurations.
+ */
+ public ReadChangeStream withoutValidation() {
+ BigtableConfig config = getBigtableConfig();
+ BigtableConfig metadataTableConfig = getMetadataTableBigtableConfig();
+ return toBuilder()
+ .setBigtableConfig(config.withValidate(false))
+
.setMetadataTableBigtableConfig(metadataTableConfig.withValidate(false))
+ .setValidateConfig(false)
+ .build();
+ }
+
+ @Override
+ public void validate(PipelineOptions options) {
+ BigtableServiceFactory factory = new BigtableServiceFactory();
+ if (getBigtableConfig().getValidate()) {
+ try {
+ checkArgument(
+ factory.checkTableExists(getBigtableConfig(), options,
getTableId()),
+ "Change Stream table %s does not exist",
+ getTableId());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ // Validate the app profile is single cluster and allows single row
transactions.
+ private void validateAppProfile(
+ MetadataTableAdminDao metadataTableAdminDao, String appProfileId) {
+ checkArgument(metadataTableAdminDao != null);
+ checkArgument(
+
metadataTableAdminDao.isAppProfileSingleClusterAndTransactional(appProfileId),
+ "App profile id '"
+ + appProfileId
+ + "' provided to access metadata table needs to use
single-cluster routing policy"
+ + " and allow single-row transactions.");
+ }
+
+ // Update metadata table schema if allowed and required.
+ private void createOrUpdateMetadataTable(
+ MetadataTableAdminDao metadataTableAdminDao, String metadataTableId) {
+ boolean shouldCreateOrUpdateMetadataTable = true;
+ if (getCreateOrUpdateMetadataTable() != null) {
+ shouldCreateOrUpdateMetadataTable = getCreateOrUpdateMetadataTable();
+ }
+ // Only try to create or update metadata table if option is set to true.
Otherwise, just
+ // check if the table exists.
+ if (shouldCreateOrUpdateMetadataTable &&
metadataTableAdminDao.createMetadataTable()) {
+ LOG.info("Created metadata table: " + metadataTableId);
+ }
+ }
+
@Override
public PCollection<KV<ByteString, ChangeStreamMutation>> expand(PBegin
input) {
checkArgument(
getBigtableConfig() != null,
"BigtableIO ReadChangeStream is missing required configurations
fields.");
- checkArgument(
- getBigtableConfig().getProjectId() != null, "Missing required
projectId field.");
- checkArgument(
- getBigtableConfig().getInstanceId() != null, "Missing required
instanceId field.");
+ getBigtableConfig().validate();
checkArgument(getTableId() != null, "Missing required tableId field.");
BigtableConfig bigtableConfig = getBigtableConfig();
Review Comment:
Nit: move this before the two calls to getBigtableConfig within this method
above
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]