Repository: beam Updated Branches: refs/heads/master f3de7363c -> c33cb0340
Adds SpannerAccessor - a utility for DoFn's that use Spanner Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c9c2e816 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c9c2e816 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c9c2e816 Branch: refs/heads/master Commit: c9c2e81672676e3ec705269a94f11fb1a2596c48 Parents: f3de736 Author: Mairbek Khadikov <[email protected]> Authored: Mon Aug 7 12:33:19 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Thu Aug 24 15:51:41 2017 -0700 ---------------------------------------------------------------------- .../sdk/io/gcp/spanner/AbstractSpannerFn.java | 71 -------------------- .../sdk/io/gcp/spanner/CreateTransactionFn.java | 22 ++++-- .../sdk/io/gcp/spanner/NaiveSpannerReadFn.java | 18 +++-- .../sdk/io/gcp/spanner/SpannerAccessor.java | 43 ++++++++++++ .../beam/sdk/io/gcp/spanner/SpannerConfig.java | 22 ++++++ .../sdk/io/gcp/spanner/SpannerWriteGroupFn.java | 24 ++++--- 6 files changed, 111 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/c9c2e816/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/AbstractSpannerFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/AbstractSpannerFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/AbstractSpannerFn.java deleted file mode 100644 index 50efdea..0000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/AbstractSpannerFn.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.spanner; - -import com.google.cloud.spanner.DatabaseClient; -import com.google.cloud.spanner.DatabaseId; -import com.google.cloud.spanner.Spanner; -import com.google.cloud.spanner.SpannerOptions; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.util.ReleaseInfo; - -/** - * Abstract {@link DoFn} that manages {@link Spanner} lifecycle. Use {@link - * AbstractSpannerFn#databaseClient} to access the Cloud Spanner database client. - */ -abstract class AbstractSpannerFn<InputT, OutputT> extends DoFn<InputT, OutputT> { - // A common user agent token that indicates that this request was originated from Apache Beam. - private static final String USER_AGENT_PREFIX = "Apache_Beam_Java"; - - private transient Spanner spanner; - private transient DatabaseClient databaseClient; - - abstract SpannerConfig getSpannerConfig(); - - @Setup - public void setup() throws Exception { - SpannerConfig spannerConfig = getSpannerConfig(); - SpannerOptions.Builder builder = SpannerOptions.newBuilder(); - if (spannerConfig.getProjectId() != null) { - builder.setProjectId(spannerConfig.getProjectId().get()); - } - if (spannerConfig.getServiceFactory() != null) { - builder.setServiceFactory(spannerConfig.getServiceFactory()); - } - ReleaseInfo releaseInfo = ReleaseInfo.getReleaseInfo(); - builder.setUserAgentPrefix(USER_AGENT_PREFIX + "/" + releaseInfo.getVersion()); - SpannerOptions options = builder.build(); - spanner = options.getService(); - databaseClient = spanner.getDatabaseClient(DatabaseId - .of(options.getProjectId(), spannerConfig.getInstanceId().get(), - spannerConfig.getDatabaseId().get())); - } - - @Teardown - public void teardown() throws Exception { - if (spanner == null) { - return; - } - spanner.close(); - spanner = null; - } - - protected DatabaseClient databaseClient() { - return databaseClient; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c9c2e816/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/CreateTransactionFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/CreateTransactionFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/CreateTransactionFn.java index da8e8b1..5574ae1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/CreateTransactionFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/CreateTransactionFn.java @@ -17,12 +17,14 @@ */ package org.apache.beam.sdk.io.gcp.spanner; +import com.google.cloud.spanner.DatabaseClient; import com.google.cloud.spanner.ReadOnlyTransaction; import com.google.cloud.spanner.ResultSet; import com.google.cloud.spanner.Statement; +import org.apache.beam.sdk.transforms.DoFn; /** Creates a batch transaction. */ -class CreateTransactionFn extends AbstractSpannerFn<Object, Transaction> { +class CreateTransactionFn extends DoFn<Object, Transaction> { private final SpannerIO.CreateTransaction config; @@ -30,10 +32,22 @@ class CreateTransactionFn extends AbstractSpannerFn<Object, Transaction> { this.config = config; } + private transient SpannerAccessor spannerAccessor; + + @Setup + public void setup() throws Exception { + spannerAccessor = config.getSpannerConfig().connectToSpanner(); + } + @Teardown + public void teardown() throws Exception { + spannerAccessor.close(); + } + @ProcessElement public void processElement(ProcessContext c) throws Exception { + DatabaseClient databaseClient = spannerAccessor.getDatabaseClient(); try (ReadOnlyTransaction readOnlyTransaction = - databaseClient().readOnlyTransaction(config.getTimestampBound())) { + databaseClient.readOnlyTransaction(config.getTimestampBound())) { // Run a dummy sql statement to force the RPC and obtain the timestamp from the server. ResultSet resultSet = readOnlyTransaction.executeQuery(Statement.of("SELECT 1")); while (resultSet.next()) { @@ -44,8 +58,4 @@ class CreateTransactionFn extends AbstractSpannerFn<Object, Transaction> { } } - @Override - SpannerConfig getSpannerConfig() { - return config.getSpannerConfig(); - } } http://git-wip-us.apache.org/repos/asf/beam/blob/c9c2e816/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java index 92b3fe3..5dc6ead 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java @@ -17,19 +17,22 @@ */ package org.apache.beam.sdk.io.gcp.spanner; +import com.google.cloud.spanner.DatabaseClient; import com.google.cloud.spanner.ReadOnlyTransaction; import com.google.cloud.spanner.ResultSet; import com.google.cloud.spanner.Struct; import com.google.cloud.spanner.TimestampBound; import com.google.common.annotations.VisibleForTesting; import javax.annotation.Nullable; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.values.PCollectionView; /** A simplest read function implementation. Parallelism support is coming. */ @VisibleForTesting -class NaiveSpannerReadFn extends AbstractSpannerFn<ReadOperation, Struct> { +class NaiveSpannerReadFn extends DoFn<ReadOperation, Struct> { private final SpannerConfig config; @Nullable private final PCollectionView<Transaction> transaction; + private transient SpannerAccessor spannerAccessor; NaiveSpannerReadFn(SpannerConfig config, @Nullable PCollectionView<Transaction> transaction) { this.config = config; @@ -40,8 +43,14 @@ class NaiveSpannerReadFn extends AbstractSpannerFn<ReadOperation, Struct> { this(config, null); } - SpannerConfig getSpannerConfig() { - return config; + + @Setup + public void setup() throws Exception { + spannerAccessor = config.connectToSpanner(); + } + @Teardown + public void teardown() throws Exception { + spannerAccessor.close(); } @ProcessElement @@ -52,8 +61,9 @@ class NaiveSpannerReadFn extends AbstractSpannerFn<ReadOperation, Struct> { timestampBound = TimestampBound.ofReadTimestamp(transaction.timestamp()); } ReadOperation op = c.element(); + DatabaseClient databaseClient = spannerAccessor.getDatabaseClient(); try (ReadOnlyTransaction readOnlyTransaction = - databaseClient().readOnlyTransaction(timestampBound)) { + databaseClient.readOnlyTransaction(timestampBound)) { ResultSet resultSet = execute(op, readOnlyTransaction); while (resultSet.next()) { c.output(resultSet.getCurrentRowAsStruct()); http://git-wip-us.apache.org/repos/asf/beam/blob/c9c2e816/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java new file mode 100644 index 0000000..f32e661 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner; + +import com.google.cloud.spanner.DatabaseClient; +import com.google.cloud.spanner.Spanner; + +/** + * Manages lifecycle of {@link DatabaseClient} and {@link Spanner} instances. + */ +public class SpannerAccessor implements AutoCloseable { + private final Spanner spanner; + private final DatabaseClient databaseClient; + + SpannerAccessor(Spanner spanner, DatabaseClient databaseClient) { + this.spanner = spanner; + this.databaseClient = databaseClient; + } + + public DatabaseClient getDatabaseClient() { + return databaseClient; + } + + @Override + public void close() { + spanner.close(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c9c2e816/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java index 034c38a..6646f32 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java @@ -21,6 +21,8 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.auto.value.AutoValue; import com.google.cloud.ServiceFactory; +import com.google.cloud.spanner.DatabaseClient; +import com.google.cloud.spanner.DatabaseId; import com.google.cloud.spanner.Spanner; import com.google.cloud.spanner.SpannerOptions; import com.google.common.annotations.VisibleForTesting; @@ -29,10 +31,13 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.util.ReleaseInfo; /** Configuration for a Cloud Spanner client. */ @AutoValue public abstract class SpannerConfig implements Serializable { + // A common user agent token that indicates that this request was originated from Apache Beam. + private static final String USER_AGENT_PREFIX = "Apache_Beam_Java"; @Nullable abstract ValueProvider<String> getProjectId(); @@ -123,4 +128,21 @@ public abstract class SpannerConfig implements Serializable { return toBuilder().setServiceFactory(serviceFactory).build(); } + public SpannerAccessor connectToSpanner() { + SpannerOptions.Builder builder = SpannerOptions.newBuilder(); + if (getProjectId() != null) { + builder.setProjectId(getProjectId().get()); + } + if (getServiceFactory() != null) { + builder.setServiceFactory(this.getServiceFactory()); + } + ReleaseInfo releaseInfo = ReleaseInfo.getReleaseInfo(); + builder.setUserAgentPrefix(USER_AGENT_PREFIX + "/" + releaseInfo.getVersion()); + SpannerOptions options = builder.build(); + Spanner spanner = options.getService(); + DatabaseClient databaseClient = spanner.getDatabaseClient( + DatabaseId.of(options.getProjectId(), getInstanceId().get(), getDatabaseId().get())); + return new SpannerAccessor(spanner, databaseClient); + } + } http://git-wip-us.apache.org/repos/asf/beam/blob/c9c2e816/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteGroupFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteGroupFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteGroupFn.java index 34a11da..9343c0c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteGroupFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteGroupFn.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.gcp.spanner; import com.google.cloud.spanner.AbortedException; +import com.google.cloud.spanner.DatabaseClient; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; @@ -25,6 +26,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.BackOff; import org.apache.beam.sdk.util.BackOffUtils; @@ -35,7 +37,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** Batches together and writes mutations to Google Cloud Spanner. */ -@VisibleForTesting class SpannerWriteGroupFn extends AbstractSpannerFn<MutationGroup, Void> { +@VisibleForTesting +class SpannerWriteGroupFn extends DoFn<MutationGroup, Void> { private static final Logger LOG = LoggerFactory.getLogger(SpannerWriteGroupFn.class); private final SpannerIO.Write spec; // Current batch of mutations to be written. @@ -48,21 +51,25 @@ import org.slf4j.LoggerFactory; .withMaxRetries(MAX_RETRIES) .withInitialBackoff(Duration.standardSeconds(5)); - @VisibleForTesting SpannerWriteGroupFn(SpannerIO.Write spec) { - this.spec = spec; - } + private transient SpannerAccessor spannerAccessor; - @Override SpannerConfig getSpannerConfig() { - return spec.getSpannerConfig(); + @VisibleForTesting + SpannerWriteGroupFn(SpannerIO.Write spec) { + this.spec = spec; } @Setup public void setup() throws Exception { - super.setup(); + spannerAccessor = spec.getSpannerConfig().connectToSpanner(); mutations = new ArrayList<>(); batchSizeBytes = 0; } + @Teardown + public void teardown() throws Exception { + spannerAccessor.close(); + } + @ProcessElement public void processElement(ProcessContext c) throws Exception { MutationGroup m = c.element(); @@ -94,10 +101,11 @@ import org.slf4j.LoggerFactory; Sleeper sleeper = Sleeper.DEFAULT; BackOff backoff = BUNDLE_WRITE_BACKOFF.backoff(); + DatabaseClient databaseClient = spannerAccessor.getDatabaseClient(); while (true) { // Batch upsert rows. try { - databaseClient().writeAtLeastOnce(Iterables.concat(mutations)); + databaseClient.writeAtLeastOnce(Iterables.concat(mutations)); // Break if the commit threw no exception. break;
