nancyxu123 commented on code in PR #30361:
URL: https://github.com/apache/beam/pull/30361#discussion_r1499613661


##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadChangeStreamTest.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance

Review Comment:
   Could we also add an IT where we set the credentials in this directory? 
   
   
https://github.com/apache/beam/blob/59e9ee9e02e9f95d4cdaee065529594ae86a72d4/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamTransactionBoundariesIT.java#L61



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java:
##########
@@ -1773,6 +1759,37 @@ && 
getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
           .apply(ParDo.of(new CleanUpReadChangeStreamDoFn(daoFactory)));
       return dataChangeRecordsOut;
     }
+
+    public SpannerConfig buildChangeStreamSpannerConfig(PipelineOptions 
pipelineOptions) {
+      SpannerConfig changeStreamSpannerConfig = getSpannerConfig();
+      // Set default retryable errors for ReadChangeStream
+      if (changeStreamSpannerConfig.getRetryableCodes() == null) {
+        ImmutableSet<Code> defaultRetryableCodes = 
ImmutableSet.of(Code.UNAVAILABLE, Code.ABORTED);
+        changeStreamSpannerConfig =
+            
changeStreamSpannerConfig.toBuilder().setRetryableCodes(defaultRetryableCodes).build();
+      }
+      // Set default retry timeouts for ReadChangeStream
+      if (changeStreamSpannerConfig.getExecuteStreamingSqlRetrySettings() == 
null) {
+        changeStreamSpannerConfig =
+            changeStreamSpannerConfig
+                .toBuilder()
+                .setExecuteStreamingSqlRetrySettings(
+                    RetrySettings.newBuilder()
+                        .setTotalTimeout(org.threeten.bp.Duration.ofMinutes(5))
+                        
.setInitialRpcTimeout(org.threeten.bp.Duration.ofMinutes(1))
+                        
.setMaxRpcTimeout(org.threeten.bp.Duration.ofMinutes(1))
+                        .build())
+                .build();
+      }
+      // If credentials are not set in SpannerConfig, check pipeline options 
for credentials.
+      if (changeStreamSpannerConfig.getCredentials() == null) {
+        final Credentials credentials = 
pipelineOptions.as(GcpOptions.class).getGcpCredential();
+        if (credentials != null) {
+          changeStreamSpannerConfig = 
changeStreamSpannerConfig.withCredentials(credentials);
+        }
+      }

Review Comment:
   Do we really need to do this part? It seems like the old code didn't require 
propagating the gcpCredentials?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to