nancyxu123 commented on code in PR #30361: URL: https://github.com/apache/beam/pull/30361#discussion_r1499613661
########## sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadChangeStreamTest.java: ########## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance Review Comment: Could we also add an IT where we set the credentials in this directory? https://github.com/apache/beam/blob/59e9ee9e02e9f95d4cdaee065529594ae86a72d4/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamTransactionBoundariesIT.java#L61 ########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java: ########## @@ -1773,6 +1759,37 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta .apply(ParDo.of(new CleanUpReadChangeStreamDoFn(daoFactory))); return dataChangeRecordsOut; } + + public SpannerConfig buildChangeStreamSpannerConfig(PipelineOptions pipelineOptions) { + SpannerConfig changeStreamSpannerConfig = getSpannerConfig(); + // Set default retryable errors for ReadChangeStream + if (changeStreamSpannerConfig.getRetryableCodes() == null) { + ImmutableSet<Code> defaultRetryableCodes = ImmutableSet.of(Code.UNAVAILABLE, Code.ABORTED); + changeStreamSpannerConfig = + changeStreamSpannerConfig.toBuilder().setRetryableCodes(defaultRetryableCodes).build(); + } + // Set default retry timeouts for ReadChangeStream + if (changeStreamSpannerConfig.getExecuteStreamingSqlRetrySettings() == null) { + changeStreamSpannerConfig = + changeStreamSpannerConfig + .toBuilder() + .setExecuteStreamingSqlRetrySettings( + RetrySettings.newBuilder() + .setTotalTimeout(org.threeten.bp.Duration.ofMinutes(5)) + .setInitialRpcTimeout(org.threeten.bp.Duration.ofMinutes(1)) + .setMaxRpcTimeout(org.threeten.bp.Duration.ofMinutes(1)) + .build()) + .build(); + } + // If credentials are not set in SpannerConfig, check pipeline options for credentials. + if (changeStreamSpannerConfig.getCredentials() == null) { + final Credentials credentials = pipelineOptions.as(GcpOptions.class).getGcpCredential(); + if (credentials != null) { + changeStreamSpannerConfig = changeStreamSpannerConfig.withCredentials(credentials); + } + } Review Comment: Do we really need to do this part? It seems like the old code didn't require propagating the gcpCredentials? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
