This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 983d9034b85 Added MetadataSpannerConfig class for generating
SpannerConfig for accessing change stream metadata database (#25193)
983d9034b85 is described below
commit 983d9034b85b3e0f8dfc2e5103dda9212fe546b6
Author: Doug Judd <[email protected]>
AuthorDate: Fri Feb 10 08:18:14 2023 -0800
Added MetadataSpannerConfig class for generating SpannerConfig for
accessing change stream metadata database (#25193)
---
.../apache/beam/sdk/io/gcp/spanner/SpannerIO.java | 10 +-
.../MetadataSpannerConfigFactory.java | 118 +++++++++++++++++++++
.../gcp/spanner/changestreams/dao/DaoFactory.java | 2 +-
3 files changed, 122 insertions(+), 8 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
index b4a451378ce..44cffc201ba 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
@@ -72,6 +72,7 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants;
+import
org.apache.beam.sdk.io.gcp.spanner.changestreams.MetadataSpannerConfigFactory;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.ActionFactory;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.DaoFactory;
import
org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.CleanUpReadChangeStreamDoFn;
@@ -89,7 +90,6 @@ import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
@@ -1625,12 +1625,8 @@ public class SpannerIO {
.build();
}
final SpannerConfig partitionMetadataSpannerConfig =
- changeStreamSpannerConfig
- .toBuilder()
-
.setInstanceId(StaticValueProvider.of(partitionMetadataInstanceId))
-
.setDatabaseId(StaticValueProvider.of(partitionMetadataDatabaseId))
- .setDatabaseRole(null)
- .build();
+ MetadataSpannerConfigFactory.create(
+ changeStreamSpannerConfig, partitionMetadataInstanceId,
partitionMetadataDatabaseId);
Dialect changeStreamDatabaseDialect =
getDialect(changeStreamSpannerConfig);
Dialect metadataDatabaseDialect =
getDialect(partitionMetadataSpannerConfig);
LOG.info(
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/MetadataSpannerConfigFactory.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/MetadataSpannerConfigFactory.java
new file mode 100644
index 00000000000..83965b1bfaa
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/MetadataSpannerConfigFactory.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams;
+
+import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.api.gax.retrying.RetrySettings;
+import com.google.api.gax.rpc.StatusCode.Code;
+import com.google.cloud.spanner.Options.RpcPriority;
+import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+import org.joda.time.Duration;
+
+/**
+ * This class generates a SpannerConfig for the change stream metadata
database by copying only the
+ * necessary fields from the SpannerConfig of the primary database.
+ */
+public class MetadataSpannerConfigFactory {
+
+ /**
+ * Generates a SpannerConfig that can be used to access the change stream
metadata database by
+ * copying only the necessary fields from the given primary database
SpannerConfig and setting the
+ * instance ID and database ID to the supplied metadata values.
+ *
+ * @param primaryConfig The SpannerConfig for accessing the primary database
+ * @param metadataInstanceId The instance ID of the metadata database
+ * @param metadataDatabaseId The database ID of the metadata database
+ * @return the metadata SpannerConfig
+ */
+ public static SpannerConfig create(
+ SpannerConfig primaryConfig, String metadataInstanceId, String
metadataDatabaseId) {
+
+ checkNotNull(
+ metadataInstanceId,
+ "MetadataSpannerConfigFactory.create requires non-null metadata
instance id");
+ checkNotNull(
+ metadataDatabaseId,
+ "MetadataSpannerConfigFactory.create requires non-null metadata
database id");
+
+ // NOTE: databaseRole should NOT be copied to the metadata config
+
+ SpannerConfig config =
+ SpannerConfig.create()
+ .withInstanceId(StaticValueProvider.of(metadataInstanceId))
+ .withDatabaseId(StaticValueProvider.of(metadataDatabaseId));
+
+ ValueProvider<String> projectId = primaryConfig.getProjectId();
+ if (projectId != null) {
+ config = config.withProjectId(StaticValueProvider.of(projectId.get()));
+ }
+
+ ValueProvider<String> host = primaryConfig.getHost();
+ if (host != null) {
+ config = config.withHost(StaticValueProvider.of(host.get()));
+ }
+
+ ValueProvider<String> emulatorHost = primaryConfig.getEmulatorHost();
+ if (emulatorHost != null) {
+ config =
config.withEmulatorHost(StaticValueProvider.of(emulatorHost.get()));
+ }
+
+ ValueProvider<Boolean> isLocalChannelProvider =
primaryConfig.getIsLocalChannelProvider();
+ if (isLocalChannelProvider != null) {
+ config =
+
config.withIsLocalChannelProvider(StaticValueProvider.of(isLocalChannelProvider.get()));
+ }
+
+ ValueProvider<Duration> commitDeadline = primaryConfig.getCommitDeadline();
+ if (commitDeadline != null) {
+ config =
config.withCommitDeadline(StaticValueProvider.of(commitDeadline.get()));
+ }
+
+ ValueProvider<Duration> maxCumulativeBackoff =
primaryConfig.getMaxCumulativeBackoff();
+ if (maxCumulativeBackoff != null) {
+ config =
config.withMaxCumulativeBackoff(StaticValueProvider.of(maxCumulativeBackoff.get()));
+ }
+
+ RetrySettings executeStreamingSqlRetrySettings =
+ primaryConfig.getExecuteStreamingSqlRetrySettings();
+ if (executeStreamingSqlRetrySettings != null) {
+ config =
config.withExecuteStreamingSqlRetrySettings(executeStreamingSqlRetrySettings);
+ }
+
+ RetrySettings commitRetrySettings = primaryConfig.getCommitRetrySettings();
+ if (commitRetrySettings != null) {
+ config = config.withCommitRetrySettings(commitRetrySettings);
+ }
+
+ ImmutableSet<Code> retryableCodes = primaryConfig.getRetryableCodes();
+ if (retryableCodes != null) {
+ config = config.withRetryableCodes(retryableCodes);
+ }
+
+ ValueProvider<RpcPriority> rpcPriority = primaryConfig.getRpcPriority();
+ if (rpcPriority != null) {
+ config =
config.withRpcPriority(StaticValueProvider.of(rpcPriority.get()));
+ }
+
+ return config;
+ }
+}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java
index 0b40ddaccad..b9718fdb675 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java
@@ -77,7 +77,7 @@ public class DaoFactory implements Serializable {
}
this.changeStreamSpannerConfig = changeStreamSpannerConfig;
this.changeStreamName = changeStreamName;
- this.metadataSpannerConfig = metadataSpannerConfig.withDatabaseRole(null);
+ this.metadataSpannerConfig = metadataSpannerConfig;
this.partitionMetadataTableName = partitionMetadataTableName;
this.rpcPriority = rpcPriority;
this.jobName = jobName;