dengwe1 commented on code in PR #30361:
URL: https://github.com/apache/beam/pull/30361#discussion_r1505028367
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java:
##########
@@ -1773,6 +1759,37 @@ &&
getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
.apply(ParDo.of(new CleanUpReadChangeStreamDoFn(daoFactory)));
return dataChangeRecordsOut;
}
+
+ public SpannerConfig buildChangeStreamSpannerConfig(PipelineOptions
pipelineOptions) {
+ SpannerConfig changeStreamSpannerConfig = getSpannerConfig();
+ // Set default retryable errors for ReadChangeStream
+ if (changeStreamSpannerConfig.getRetryableCodes() == null) {
+ ImmutableSet<Code> defaultRetryableCodes =
ImmutableSet.of(Code.UNAVAILABLE, Code.ABORTED);
+ changeStreamSpannerConfig =
+
changeStreamSpannerConfig.toBuilder().setRetryableCodes(defaultRetryableCodes).build();
+ }
+ // Set default retry timeouts for ReadChangeStream
+ if (changeStreamSpannerConfig.getExecuteStreamingSqlRetrySettings() ==
null) {
+ changeStreamSpannerConfig =
+ changeStreamSpannerConfig
+ .toBuilder()
+ .setExecuteStreamingSqlRetrySettings(
+ RetrySettings.newBuilder()
+ .setTotalTimeout(org.threeten.bp.Duration.ofMinutes(5))
+
.setInitialRpcTimeout(org.threeten.bp.Duration.ofMinutes(1))
+
.setMaxRpcTimeout(org.threeten.bp.Duration.ofMinutes(1))
+ .build())
+ .build();
+ }
+ // If credentials are not set in SpannerConfig, check pipeline options
for credentials.
+ if (changeStreamSpannerConfig.getCredentials() == null) {
+ final Credentials credentials =
pipelineOptions.as(GcpOptions.class).getGcpCredential();
+ if (credentials != null) {
+ changeStreamSpannerConfig =
changeStreamSpannerConfig.withCredentials(credentials);
+ }
+ }
Review Comment:
I updated the logic. We only need to propagate the pipeline credentials to
the getDialect() calls specifically.
We don't really need to do this for the changeStreamSpannerConfig that is
passed to the DaoFactory, where the credentials will be handled by Dataflow
(when running on Dataflow).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]