jackdingilian commented on code in PR #31376:
URL: https://github.com/apache/beam/pull/31376#discussion_r1613798994


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java:
##########
@@ -2284,25 +2286,77 @@ public ReadChangeStream 
withBacklogReplicationAdjustment(Duration adjustment) {
       return toBuilder().setBacklogReplicationAdjustment(adjustment).build();
     }
 
+    /**

Review Comment:
   We should note that this disables metadata table creation / update and that 
will need to be done explicitly when this is enabled (and the table isn't 
created / up-to-date)



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java:
##########
@@ -2284,25 +2286,77 @@ public ReadChangeStream 
withBacklogReplicationAdjustment(Duration adjustment) {
       return toBuilder().setBacklogReplicationAdjustment(adjustment).build();
     }
 
+    /**
+     * Disables validation that the table being read and the metadata table 
exists, and that the app
+     * profile used is single cluster and single row transcation enabled. Set 
this option if the
+     * caller does not have additional Bigtable permissions to validate the 
configurations.
+     */
+    public ReadChangeStream withoutValidation() {
+      BigtableConfig config = getBigtableConfig();
+      BigtableConfig metadataTableConfig = getMetadataTableBigtableConfig();
+      return toBuilder()
+          .setBigtableConfig(config.withValidate(false))
+          
.setMetadataTableBigtableConfig(metadataTableConfig.withValidate(false))
+          .setValidateConfig(false)
+          .build();
+    }
+
+    @Override
+    public void validate(PipelineOptions options) {
+      BigtableServiceFactory factory = new BigtableServiceFactory();
+      if (getBigtableConfig().getValidate()) {
+        try {
+          checkArgument(
+              factory.checkTableExists(getBigtableConfig(), options, 
getTableId()),
+              "Change Stream table %s does not exist",
+              getTableId());
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }
+
+    // Validate the app profile is single cluster and allows single row 
transactions.
+    private void validateAppProfile(
+        MetadataTableAdminDao metadataTableAdminDao, String appProfileId) {
+      checkArgument(metadataTableAdminDao != null);
+      checkArgument(
+          
metadataTableAdminDao.isAppProfileSingleClusterAndTransactional(appProfileId),
+          "App profile id '"
+              + appProfileId
+              + "' provided to access metadata table needs to use 
single-cluster routing policy"
+              + " and allow single-row transactions.");
+    }
+
+    // Update metadata table schema if allowed and required.
+    private void createOrUpdateMetadataTable(
+        MetadataTableAdminDao metadataTableAdminDao, String metadataTableId) {
+      boolean shouldCreateOrUpdateMetadataTable = true;
+      if (getCreateOrUpdateMetadataTable() != null) {
+        shouldCreateOrUpdateMetadataTable = getCreateOrUpdateMetadataTable();
+      }
+      // Only try to create or update metadata table if option is set to true. 
Otherwise, just
+      // check if the table exists.
+      if (shouldCreateOrUpdateMetadataTable && 
metadataTableAdminDao.createMetadataTable()) {
+        LOG.info("Created metadata table: " + metadataTableId);
+      }
+    }
+
     @Override
     public PCollection<KV<ByteString, ChangeStreamMutation>> expand(PBegin 
input) {
       checkArgument(
           getBigtableConfig() != null,
           "BigtableIO ReadChangeStream is missing required configurations 
fields.");
-      checkArgument(
-          getBigtableConfig().getProjectId() != null, "Missing required 
projectId field.");
-      checkArgument(
-          getBigtableConfig().getInstanceId() != null, "Missing required 
instanceId field.");
+      getBigtableConfig().validate();
       checkArgument(getTableId() != null, "Missing required tableId field.");
 
       BigtableConfig bigtableConfig = getBigtableConfig();

Review Comment:
   Nit: move this before the two calls to getBigtableConfig within this method 
above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to