http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java new file mode 100644 index 0000000..c2b62b7 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.StructuredCoder; +import org.apache.beam.sdk.coders.VarIntCoder; + + +/** + * A {@link Coder} for {@link ShardedKey}, using a wrapped key {@link Coder}. + */ +@VisibleForTesting +class ShardedKeyCoder<KeyT> + extends StructuredCoder<ShardedKey<KeyT>> { + public static <KeyT> ShardedKeyCoder<KeyT> of(Coder<KeyT> keyCoder) { + return new ShardedKeyCoder<>(keyCoder); + } + + private final Coder<KeyT> keyCoder; + private final VarIntCoder shardNumberCoder; + + protected ShardedKeyCoder(Coder<KeyT> keyCoder) { + this.keyCoder = keyCoder; + this.shardNumberCoder = VarIntCoder.of(); + } + + @Override + public List<? extends Coder<?>> getCoderArguments() { + return Arrays.asList(keyCoder); + } + + @Override + public void encode(ShardedKey<KeyT> key, OutputStream outStream) + throws IOException { + keyCoder.encode(key.getKey(), outStream); + shardNumberCoder.encode(key.getShardNumber(), outStream); + } + + @Override + public ShardedKey<KeyT> decode(InputStream inStream) + throws IOException { + return new ShardedKey<>( + keyCoder.decode(inStream), + shardNumberCoder.decode(inStream)); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + keyCoder.verifyDeterministic(); + } +}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java index a210858..63e5bc1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java @@ -33,7 +33,6 @@ import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.ShardedKey; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.ValueInSingleWindow; http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java index fa5b3ce..18b2033 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java @@ -19,7 +19,6 @@ package org.apache.beam.sdk.io.gcp.bigquery; import com.google.api.services.bigquery.model.TableRow; import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.ShardedKeyCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -30,7 +29,6 @@ import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.ShardedKey; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java index 51b9375..cd88222 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java @@ -26,7 +26,6 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.ShardedKey; /** * Fn that tags each table row with a unique id and destination table. To avoid calling http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java index e1ed746..d68779a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java @@ -19,7 +19,6 @@ package org.apache.beam.sdk.io.gcp.bigquery; import static com.google.common.base.Preconditions.checkNotNull; - import com.google.api.services.bigquery.model.TableRow; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -41,7 +40,6 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.ShardedKey; import org.apache.beam.sdk.values.TupleTag; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java index 887cb93..45dc2a8 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java @@ -22,7 +22,6 @@ import com.google.api.services.bigquery.model.TableRow; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.ShardedKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java index 451d1bd..acd1132 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java @@ -26,7 +26,6 @@ import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.ShardedKey; import org.apache.beam.sdk.values.TupleTag; /** http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java index 9ed2916..c5494d8 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java @@ -42,7 +42,6 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.ShardedKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index 0a90dde..1692cda 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -175,7 +175,7 @@ import org.slf4j.LoggerFactory; * pipeline. Please refer to the documentation of corresponding * {@link PipelineRunner PipelineRunners} for more details. */ -@Experimental(Experimental.Kind.SOURCE_SINK) +@Experimental public class BigtableIO { private static final Logger LOG = LoggerFactory.getLogger(BigtableIO.class); @@ -211,7 +211,7 @@ public class BigtableIO { * * @see BigtableIO */ - @Experimental(Experimental.Kind.SOURCE_SINK) + @Experimental @AutoValue public abstract static class Read extends PTransform<PBegin, PCollection<Row>> { @@ -415,7 +415,7 @@ public class BigtableIO { * * @see BigtableIO */ - @Experimental(Experimental.Kind.SOURCE_SINK) + @Experimental @AutoValue public abstract static class Write extends PTransform<PCollection<KV<ByteString, Iterable<Mutation>>>, PDone> { @@ -1027,7 +1027,7 @@ public class BigtableIO { "{}: Failed to interpolate key for fraction {}.", rangeTracker.getRange(), fraction, e); return null; } - LOG.info( + LOG.debug( "Proposing to split {} at fraction {} (key {})", rangeTracker, fraction, splitKey); BigtableSource primary; BigtableSource residual; http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java index 07476e2..d1a17fe 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java @@ -168,13 +168,14 @@ class BigtableServiceImpl implements BigtableService { private BigtableSession session; private AsyncExecutor executor; private BulkMutation bulkMutation; - private final String tableName; + private final MutateRowRequest.Builder partialBuilder; public BigtableWriterImpl(BigtableSession session, BigtableTableName tableName) { this.session = session; executor = session.createAsyncExecutor(); bulkMutation = session.createBulkMutation(tableName, executor); - this.tableName = tableName.toString(); + + partialBuilder = MutateRowRequest.newBuilder().setTableName(tableName.toString()); } @Override @@ -207,8 +208,8 @@ class BigtableServiceImpl implements BigtableService { KV<ByteString, Iterable<Mutation>> record) throws IOException { MutateRowRequest r = - MutateRowRequest.newBuilder() - .setTableName(tableName) + partialBuilder + .clone() .setRowKey(record.getKey()) .addAllMutations(record.getValue()) .build(); http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/AdaptiveThrottler.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/AdaptiveThrottler.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/AdaptiveThrottler.java deleted file mode 100644 index ce6ebe6..0000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/AdaptiveThrottler.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.io.gcp.datastore; - -import com.google.common.annotations.VisibleForTesting; -import java.util.Random; -import org.apache.beam.sdk.transforms.Sum; -import org.apache.beam.sdk.util.MovingFunction; - - -/** - * An implementation of client-side adaptive throttling. See - * https://landing.google.com/sre/book/chapters/handling-overload.html#client-side-throttling-a7sYUg - * for a full discussion of the use case and algorithm applied. - */ -class AdaptiveThrottler { - private final MovingFunction successfulRequests; - private final MovingFunction allRequests; - - /** The target ratio between requests sent and successful requests. This is "K" in the formula in - * https://landing.google.com/sre/book/chapters/handling-overload.html */ - private final double overloadRatio; - - /** The target minimum number of requests per samplePeriodMs, even if no requests succeed. Must be - * greater than 0, else we could throttle to zero. Because every decision is probabilistic, there - * is no guarantee that the request rate in any given interval will not be zero. (This is the +1 - * from the formula in https://landing.google.com/sre/book/chapters/handling-overload.html */ - private static final double MIN_REQUESTS = 1; - private final Random random; - - /** - * @param samplePeriodMs the time window to keep of request history to inform throttling - * decisions. - * @param sampleUpdateMs the length of buckets within this time window. - * @param overloadRatio the target ratio between requests sent and successful requests. You should - * always set this to more than 1, otherwise the client would never try to send more requests than - * succeeded in the past - so it could never recover from temporary setbacks. - */ - public AdaptiveThrottler(long samplePeriodMs, long sampleUpdateMs, - double overloadRatio) { - this(samplePeriodMs, sampleUpdateMs, overloadRatio, new Random()); - } - - @VisibleForTesting - AdaptiveThrottler(long samplePeriodMs, long sampleUpdateMs, - double overloadRatio, Random random) { - allRequests = - new MovingFunction(samplePeriodMs, sampleUpdateMs, - 1 /* numSignificantBuckets */, 1 /* numSignificantSamples */, Sum.ofLongs()); - successfulRequests = - new MovingFunction(samplePeriodMs, sampleUpdateMs, - 1 /* numSignificantBuckets */, 1 /* numSignificantSamples */, Sum.ofLongs()); - this.overloadRatio = overloadRatio; - this.random = random; - } - - @VisibleForTesting - double throttlingProbability(long nowMsSinceEpoch) { - if (!allRequests.isSignificant()) { - return 0; - } - long allRequestsNow = allRequests.get(nowMsSinceEpoch); - long successfulRequestsNow = successfulRequests.get(nowMsSinceEpoch); - return Math.max(0, - (allRequestsNow - overloadRatio * successfulRequestsNow) / (allRequestsNow + MIN_REQUESTS)); - } - - /** - * Call this before sending a request to the remote service; if this returns true, drop the - * request (treating it as a failure or trying it again at a later time). - */ - public boolean throttleRequest(long nowMsSinceEpoch) { - double delayProbability = throttlingProbability(nowMsSinceEpoch); - // Note that we increment the count of all requests here, even if we return true - so even if we - // tell the client not to send a request at all, it still counts as a failed request. - allRequests.add(nowMsSinceEpoch, 1); - - return (random.nextDouble() < delayProbability); - } - - /** - * Call this after {@link throttleRequest} if your request was successful. - */ - public void successfulRequest(long nowMsSinceEpoch) { - successfulRequests.add(nowMsSinceEpoch, 1); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java index 5f65428..b198a6f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java @@ -71,8 +71,6 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; -import org.apache.beam.sdk.metrics.Counter; -import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; @@ -203,31 +201,11 @@ public class DatastoreV1 { DatastoreV1() {} /** - * The number of entity updates written per RPC, initially. We buffer updates in the connector and - * write a batch to Datastore once we have collected a certain number. This is the initial batch - * size; it is adjusted at runtime based on the performance of previous writes (see {@link - * DatastoreV1.WriteBatcher}). - * - * <p>Testing has found that a batch of 200 entities will generally finish within the timeout even - * in adverse conditions. - */ - @VisibleForTesting - static final int DATASTORE_BATCH_UPDATE_ENTITIES_START = 200; - - /** - * When choosing the number of updates in a single RPC, never exceed the maximum allowed by the - * API. + * Cloud Datastore has a limit of 500 mutations per batch operation, so we flush + * changes to Datastore every 500 entities. */ @VisibleForTesting - static final int DATASTORE_BATCH_UPDATE_ENTITIES_LIMIT = 500; - - /** - * When choosing the number of updates in a single RPC, do not go below this value. The actual - * number of entities per request may be lower when we flush for the end of a bundle or if we hit - * {@link DatastoreV1.DATASTORE_BATCH_UPDATE_BYTES_LIMIT}. - */ - @VisibleForTesting - static final int DATASTORE_BATCH_UPDATE_ENTITIES_MIN = 10; + static final int DATASTORE_BATCH_UPDATE_LIMIT = 500; /** * Cloud Datastore has a limit of 10MB per RPC, so we also flush if the total size of mutations @@ -235,7 +213,7 @@ public class DatastoreV1 { * the mutations themselves and not the CommitRequest wrapper around them. */ @VisibleForTesting - static final int DATASTORE_BATCH_UPDATE_BYTES_LIMIT = 9_000_000; + static final int DATASTORE_BATCH_UPDATE_BYTES_LIMIT = 5_000_000; /** * Returns an empty {@link DatastoreV1.Read} builder. Configure the source {@code projectId}, @@ -1129,74 +1107,18 @@ public class DatastoreV1 { } } - /** Determines batch sizes for commit RPCs. */ - @VisibleForTesting - interface WriteBatcher { - /** Call before using this WriteBatcher. */ - void start(); - - /** - * Reports the latency of a previous commit RPC, and the number of mutations that it contained. - */ - void addRequestLatency(long timeSinceEpochMillis, long latencyMillis, int numMutations); - - /** Returns the number of entities to include in the next CommitRequest. */ - int nextBatchSize(long timeSinceEpochMillis); - } - - /** - * Determines batch sizes for commit RPCs based on past performance. - * - * <p>It aims for a target response time per RPC: it uses the response times for previous RPCs - * and the number of entities contained in them, calculates a rolling average time-per-entity, and - * chooses the number of entities for future writes to hit the target time. - * - * <p>This enables us to send large batches without sending over-large requests in the case of - * expensive entity writes that may timeout before the server can apply them all. - */ - @VisibleForTesting - static class WriteBatcherImpl implements WriteBatcher, Serializable { - /** Target time per RPC for writes. */ - static final int DATASTORE_BATCH_TARGET_LATENCY_MS = 5000; - - @Override - public void start() { - meanLatencyPerEntityMs = new MovingAverage( - 120000 /* sample period 2 minutes */, 10000 /* sample interval 10s */, - 1 /* numSignificantBuckets */, 1 /* numSignificantSamples */); - } - - @Override - public void addRequestLatency(long timeSinceEpochMillis, long latencyMillis, int numMutations) { - meanLatencyPerEntityMs.add(timeSinceEpochMillis, latencyMillis / numMutations); - } - - @Override - public int nextBatchSize(long timeSinceEpochMillis) { - if (!meanLatencyPerEntityMs.hasValue(timeSinceEpochMillis)) { - return DATASTORE_BATCH_UPDATE_ENTITIES_START; - } - long recentMeanLatency = Math.max(meanLatencyPerEntityMs.get(timeSinceEpochMillis), 1); - return (int) Math.max(DATASTORE_BATCH_UPDATE_ENTITIES_MIN, - Math.min(DATASTORE_BATCH_UPDATE_ENTITIES_LIMIT, - DATASTORE_BATCH_TARGET_LATENCY_MS / recentMeanLatency)); - } - - private transient MovingAverage meanLatencyPerEntityMs; - } - /** * {@link DoFn} that writes {@link Mutation}s to Cloud Datastore. Mutations are written in - * batches; see {@link DatastoreV1.WriteBatcherImpl}. + * batches, where the maximum batch size is {@link DatastoreV1#DATASTORE_BATCH_UPDATE_LIMIT}. * * <p>See <a * href="https://cloud.google.com/datastore/docs/concepts/entities"> * Datastore: Entities, Properties, and Keys</a> for information about entity keys and mutations. * * <p>Commits are non-transactional. If a commit fails because of a conflict over an entity - * group, the commit will be retried (up to {@link DatastoreV1.DatastoreWriterFn#MAX_RETRIES} + * group, the commit will be retried (up to {@link DatastoreV1#DATASTORE_BATCH_UPDATE_LIMIT} * times). This means that the mutation operation should be idempotent. Thus, the writer should - * only be used for {@code upsert} and {@code delete} mutation operations, as these are the only + * only be used for {code upsert} and {@code delete} mutation operations, as these are the only * two Cloud Datastore mutations that are idempotent. */ @VisibleForTesting @@ -1210,14 +1132,6 @@ public class DatastoreV1 { // Current batch of mutations to be written. private final List<Mutation> mutations = new ArrayList<>(); private int mutationsSize = 0; // Accumulated size of protos in mutations. - private WriteBatcher writeBatcher; - private transient AdaptiveThrottler throttler; - private final Counter throttledSeconds = - Metrics.counter(DatastoreWriterFn.class, "cumulativeThrottlingSeconds"); - private final Counter rpcErrors = - Metrics.counter(DatastoreWriterFn.class, "datastoreRpcErrors"); - private final Counter rpcSuccesses = - Metrics.counter(DatastoreWriterFn.class, "datastoreRpcSuccesses"); private static final int MAX_RETRIES = 5; private static final FluentBackoff BUNDLE_WRITE_BACKOFF = @@ -1225,31 +1139,24 @@ public class DatastoreV1 { .withMaxRetries(MAX_RETRIES).withInitialBackoff(Duration.standardSeconds(5)); DatastoreWriterFn(String projectId, @Nullable String localhost) { - this(StaticValueProvider.of(projectId), localhost, new V1DatastoreFactory(), - new WriteBatcherImpl()); + this(StaticValueProvider.of(projectId), localhost, new V1DatastoreFactory()); } DatastoreWriterFn(ValueProvider<String> projectId, @Nullable String localhost) { - this(projectId, localhost, new V1DatastoreFactory(), new WriteBatcherImpl()); + this(projectId, localhost, new V1DatastoreFactory()); } @VisibleForTesting DatastoreWriterFn(ValueProvider<String> projectId, @Nullable String localhost, - V1DatastoreFactory datastoreFactory, WriteBatcher writeBatcher) { + V1DatastoreFactory datastoreFactory) { this.projectId = checkNotNull(projectId, "projectId"); this.localhost = localhost; this.datastoreFactory = datastoreFactory; - this.writeBatcher = writeBatcher; } @StartBundle public void startBundle(StartBundleContext c) { datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), projectId.get(), localhost); - writeBatcher.start(); - if (throttler == null) { - // Initialize throttler at first use, because it is not serializable. - throttler = new AdaptiveThrottler(120000, 10000, 1.25); - } } @ProcessElement @@ -1262,7 +1169,7 @@ public class DatastoreV1 { } mutations.add(c.element()); mutationsSize += size; - if (mutations.size() >= writeBatcher.nextBatchSize(System.currentTimeMillis())) { + if (mutations.size() >= DatastoreV1.DATASTORE_BATCH_UPDATE_LIMIT) { flushBatch(); } } @@ -1292,42 +1199,18 @@ public class DatastoreV1 { while (true) { // Batch upsert entities. - CommitRequest.Builder commitRequest = CommitRequest.newBuilder(); - commitRequest.addAllMutations(mutations); - commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL); - long startTime = System.currentTimeMillis(), endTime; - - if (throttler.throttleRequest(startTime)) { - LOG.info("Delaying request due to previous failures"); - throttledSeconds.inc(WriteBatcherImpl.DATASTORE_BATCH_TARGET_LATENCY_MS / 1000); - sleeper.sleep(WriteBatcherImpl.DATASTORE_BATCH_TARGET_LATENCY_MS); - continue; - } - try { + CommitRequest.Builder commitRequest = CommitRequest.newBuilder(); + commitRequest.addAllMutations(mutations); + commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL); datastore.commit(commitRequest.build()); - endTime = System.currentTimeMillis(); - - writeBatcher.addRequestLatency(endTime, endTime - startTime, mutations.size()); - throttler.successfulRequest(startTime); - rpcSuccesses.inc(); - // Break if the commit threw no exception. break; } catch (DatastoreException exception) { - if (exception.getCode() == Code.DEADLINE_EXCEEDED) { - /* Most errors are not related to request size, and should not change our expectation of - * the latency of successful requests. DEADLINE_EXCEEDED can be taken into - * consideration, though. */ - endTime = System.currentTimeMillis(); - writeBatcher.addRequestLatency(endTime, endTime - startTime, mutations.size()); - } // Only log the code and message for potentially-transient errors. The entire exception // will be propagated upon the last retry. - LOG.error("Error writing batch of {} mutations to Datastore ({}): {}", mutations.size(), - exception.getCode(), exception.getMessage()); - rpcErrors.inc(); - + LOG.error("Error writing to the Datastore ({}): {}", exception.getCode(), + exception.getMessage()); if (!BackOffUtils.next(sleeper, backoff)) { LOG.error("Aborting after {} retries.", MAX_RETRIES); throw exception; http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/MovingAverage.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/MovingAverage.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/MovingAverage.java deleted file mode 100644 index 0890e79..0000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/MovingAverage.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.io.gcp.datastore; - -import org.apache.beam.sdk.transforms.Sum; -import org.apache.beam.sdk.util.MovingFunction; - - -class MovingAverage { - private final MovingFunction sum; - private final MovingFunction count; - - public MovingAverage(long samplePeriodMs, long sampleUpdateMs, - int numSignificantBuckets, int numSignificantSamples) { - sum = new MovingFunction(samplePeriodMs, sampleUpdateMs, - numSignificantBuckets, numSignificantSamples, Sum.ofLongs()); - count = new MovingFunction(samplePeriodMs, sampleUpdateMs, - numSignificantBuckets, numSignificantSamples, Sum.ofLongs()); - } - - public void add(long nowMsSinceEpoch, long value) { - sum.add(nowMsSinceEpoch, value); - count.add(nowMsSinceEpoch, 1); - } - - public long get(long nowMsSinceEpoch) { - return sum.get(nowMsSinceEpoch) / count.get(nowMsSinceEpoch); - } - - public boolean hasValue(long nowMsSinceEpoch) { - return sum.isSignificant() && count.isSignificant() - && count.get(nowMsSinceEpoch) > 0; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/AbstractSpannerFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/AbstractSpannerFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/AbstractSpannerFn.java deleted file mode 100644 index 00008f1..0000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/AbstractSpannerFn.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.spanner; - -import com.google.cloud.spanner.DatabaseClient; -import com.google.cloud.spanner.DatabaseId; -import com.google.cloud.spanner.Spanner; -import com.google.cloud.spanner.SpannerOptions; -import org.apache.beam.sdk.transforms.DoFn; - -/** - * Abstract {@link DoFn} that manages {@link Spanner} lifecycle. Use {@link - * AbstractSpannerFn#databaseClient} to access the Cloud Spanner database client. - */ -abstract class AbstractSpannerFn<InputT, OutputT> extends DoFn<InputT, OutputT> { - private transient Spanner spanner; - private transient DatabaseClient databaseClient; - - abstract SpannerConfig getSpannerConfig(); - - @Setup - public void setup() throws Exception { - SpannerConfig spannerConfig = getSpannerConfig(); - SpannerOptions options = spannerConfig.buildSpannerOptions(); - spanner = options.getService(); - databaseClient = spanner.getDatabaseClient(DatabaseId - .of(options.getProjectId(), spannerConfig.getInstanceId().get(), - spannerConfig.getDatabaseId().get())); - } - - @Teardown - public void teardown() throws Exception { - if (spanner == null) { - return; - } - spanner.close(); - spanner = null; - } - - protected DatabaseClient databaseClient() { - return databaseClient; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/CreateTransactionFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/CreateTransactionFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/CreateTransactionFn.java deleted file mode 100644 index da8e8b1..0000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/CreateTransactionFn.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.spanner; - -import com.google.cloud.spanner.ReadOnlyTransaction; -import com.google.cloud.spanner.ResultSet; -import com.google.cloud.spanner.Statement; - -/** Creates a batch transaction. */ -class CreateTransactionFn extends AbstractSpannerFn<Object, Transaction> { - - private final SpannerIO.CreateTransaction config; - - CreateTransactionFn(SpannerIO.CreateTransaction config) { - this.config = config; - } - - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - try (ReadOnlyTransaction readOnlyTransaction = - databaseClient().readOnlyTransaction(config.getTimestampBound())) { - // Run a dummy sql statement to force the RPC and obtain the timestamp from the server. - ResultSet resultSet = readOnlyTransaction.executeQuery(Statement.of("SELECT 1")); - while (resultSet.next()) { - // do nothing - } - Transaction tx = Transaction.create(readOnlyTransaction.getReadTimestamp()); - c.output(tx); - } - } - - @Override - SpannerConfig getSpannerConfig() { - return config.getSpannerConfig(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroup.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroup.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroup.java deleted file mode 100644 index 5b08da2..0000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroup.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.spanner; - -import com.google.cloud.spanner.Mutation; -import com.google.common.collect.ImmutableList; - -import java.io.Serializable; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; - -/** - * A bundle of mutations that must be submitted atomically. - * - * <p>One of the mutations is chosen to be "primary", and can be used to determine partitions. - */ -public final class MutationGroup implements Serializable, Iterable<Mutation> { - private final ImmutableList<Mutation> mutations; - - /** - * Creates a new group. - * - * @param primary a primary mutation. - * @param other other mutations, usually interleaved in parent. - * @return new mutation group. - */ - public static MutationGroup create(Mutation primary, Mutation... other) { - return create(primary, Arrays.asList(other)); - } - - public static MutationGroup create(Mutation primary, Iterable<Mutation> other) { - return new MutationGroup(ImmutableList.<Mutation>builder().add(primary).addAll(other).build()); - } - - @Override - public Iterator<Mutation> iterator() { - return mutations.iterator(); - } - - private MutationGroup(ImmutableList<Mutation> mutations) { - this.mutations = mutations; - } - - public Mutation primary() { - return mutations.get(0); - } - - public List<Mutation> attached() { - return mutations.subList(1, mutations.size()); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java index 2418816..61652e7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java @@ -44,15 +44,6 @@ class MutationSizeEstimator { return result; } - /** Estimates a size of the mutation group in bytes. */ - public static long sizeOf(MutationGroup group) { - long result = 0; - for (Mutation m : group) { - result += sizeOf(m); - } - return result; - } - private static long estimatePrimitiveValue(Value v) { switch (v.getType().getCode()) { case BOOL: http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java deleted file mode 100644 index d193b95..0000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.spanner; - -import com.google.cloud.spanner.ReadOnlyTransaction; -import com.google.cloud.spanner.ResultSet; -import com.google.cloud.spanner.Struct; -import com.google.cloud.spanner.TimestampBound; -import com.google.common.annotations.VisibleForTesting; - -/** A simplest read function implementation. Parallelism support is coming. */ -@VisibleForTesting -class NaiveSpannerReadFn extends AbstractSpannerFn<Object, Struct> { - private final SpannerIO.Read config; - - NaiveSpannerReadFn(SpannerIO.Read config) { - this.config = config; - } - - SpannerConfig getSpannerConfig() { - return config.getSpannerConfig(); - } - - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - TimestampBound timestampBound = TimestampBound.strong(); - if (config.getTransaction() != null) { - Transaction transaction = c.sideInput(config.getTransaction()); - timestampBound = TimestampBound.ofReadTimestamp(transaction.timestamp()); - } - try (ReadOnlyTransaction readOnlyTransaction = - databaseClient().readOnlyTransaction(timestampBound)) { - ResultSet resultSet = execute(readOnlyTransaction); - while (resultSet.next()) { - c.output(resultSet.getCurrentRowAsStruct()); - } - } - } - - private ResultSet execute(ReadOnlyTransaction readOnlyTransaction) { - if (config.getQuery() != null) { - return readOnlyTransaction.executeQuery(config.getQuery()); - } - if (config.getIndex() != null) { - return readOnlyTransaction.readUsingIndex( - config.getTable(), config.getIndex(), config.getKeySet(), config.getColumns()); - } - return readOnlyTransaction.read(config.getTable(), config.getKeySet(), config.getColumns()); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java deleted file mode 100644 index 02716fb..0000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.spanner; - -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.auto.value.AutoValue; -import com.google.cloud.ServiceFactory; -import com.google.cloud.spanner.Spanner; -import com.google.cloud.spanner.SpannerOptions; -import com.google.common.annotations.VisibleForTesting; -import java.io.Serializable; -import javax.annotation.Nullable; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.ValueProvider; -import org.apache.beam.sdk.transforms.display.DisplayData; - -/** Configuration for a Cloud Spanner client. */ -@AutoValue -public abstract class SpannerConfig implements Serializable { - - @Nullable - abstract ValueProvider<String> getProjectId(); - - @Nullable - abstract ValueProvider<String> getInstanceId(); - - @Nullable - abstract ValueProvider<String> getDatabaseId(); - - @Nullable - @VisibleForTesting - abstract ServiceFactory<Spanner, SpannerOptions> getServiceFactory(); - - abstract Builder toBuilder(); - - SpannerOptions buildSpannerOptions() { - SpannerOptions.Builder builder = SpannerOptions.newBuilder(); - if (getProjectId() != null) { - builder.setProjectId(getProjectId().get()); - } - if (getServiceFactory() != null) { - builder.setServiceFactory(getServiceFactory()); - } - return builder.build(); - } - - public static SpannerConfig create() { - return builder().build(); - } - - static Builder builder() { - return new AutoValue_SpannerConfig.Builder(); - } - - public void validate(PipelineOptions options) { - checkNotNull( - getInstanceId(), - "SpannerIO.read() requires instance id to be set with withInstanceId method"); - checkNotNull( - getDatabaseId(), - "SpannerIO.read() requires database id to be set with withDatabaseId method"); - } - - public void populateDisplayData(DisplayData.Builder builder) { - builder - .addIfNotNull(DisplayData.item("projectId", getProjectId()).withLabel("Output Project")) - .addIfNotNull(DisplayData.item("instanceId", getInstanceId()).withLabel("Output Instance")) - .addIfNotNull(DisplayData.item("databaseId", getDatabaseId()).withLabel("Output Database")); - - if (getServiceFactory() != null) { - builder.addIfNotNull( - DisplayData.item("serviceFactory", getServiceFactory().getClass().getName()) - .withLabel("Service Factory")); - } - } - - /** Builder for {@link SpannerConfig}. */ - @AutoValue.Builder - public abstract static class Builder { - - abstract Builder setProjectId(ValueProvider<String> projectId); - - abstract Builder setInstanceId(ValueProvider<String> instanceId); - - abstract Builder setDatabaseId(ValueProvider<String> databaseId); - - abstract Builder setServiceFactory(ServiceFactory<Spanner, SpannerOptions> serviceFactory); - - public abstract SpannerConfig build(); - } - - public SpannerConfig withProjectId(ValueProvider<String> projectId) { - return toBuilder().setProjectId(projectId).build(); - } - - public SpannerConfig withProjectId(String projectId) { - return withProjectId(ValueProvider.StaticValueProvider.of(projectId)); - } - - public SpannerConfig withInstanceId(ValueProvider<String> instanceId) { - return toBuilder().setInstanceId(instanceId).build(); - } - - public SpannerConfig withInstanceId(String instanceId) { - return withInstanceId(ValueProvider.StaticValueProvider.of(instanceId)); - } - - public SpannerConfig withDatabaseId(ValueProvider<String> databaseId) { - return toBuilder().setDatabaseId(databaseId).build(); - } - - public SpannerConfig withDatabaseId(String databaseId) { - return withDatabaseId(ValueProvider.StaticValueProvider.of(databaseId)); - } - - @VisibleForTesting - SpannerConfig withServiceFactory(ServiceFactory<Spanner, SpannerOptions> serviceFactory) { - return toBuilder().setServiceFactory(serviceFactory).build(); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java index a247d4c..5058d13 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -17,39 +17,38 @@ */ package org.apache.beam.sdk.io.gcp.spanner; -import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import com.google.auto.value.AutoValue; import com.google.cloud.ServiceFactory; -import com.google.cloud.Timestamp; -import com.google.cloud.spanner.KeySet; +import com.google.cloud.ServiceOptions; +import com.google.cloud.spanner.AbortedException; +import com.google.cloud.spanner.DatabaseClient; +import com.google.cloud.spanner.DatabaseId; import com.google.cloud.spanner.Mutation; import com.google.cloud.spanner.Spanner; import com.google.cloud.spanner.SpannerOptions; -import com.google.cloud.spanner.Statement; -import com.google.cloud.spanner.Struct; -import com.google.cloud.spanner.TimestampBound; import com.google.common.annotations.VisibleForTesting; - -import java.util.Arrays; -import java.util.Collections; +import java.io.IOException; +import java.util.ArrayList; import java.util.List; import javax.annotation.Nullable; - import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.ValueProvider; -import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.transforms.display.DisplayData.Builder; +import org.apache.beam.sdk.util.BackOff; +import org.apache.beam.sdk.util.BackOffUtils; +import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.sdk.util.Sleeper; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PDone; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Experimental {@link PTransform Transforms} for reading from and writing to <a @@ -57,69 +56,7 @@ import org.apache.beam.sdk.values.PDone; * * <h3>Reading from Cloud Spanner</h3> * - * <p>To read from Cloud Spanner, apply {@link SpannerIO.Read} transformation. It will return a - * {@link PCollection} of {@link Struct Structs}, where each element represents - * an individual row returned from the read operation. Both Query and Read APIs are supported. - * See more information about <a href="https://cloud.google.com/spanner/docs/reads">reading from - * Cloud Spanner</a> - * - * <p>To execute a <strong>query</strong>, specify a {@link SpannerIO.Read#withQuery(Statement)} or - * {@link SpannerIO.Read#withQuery(String)} during the construction of the transform. - * - * <pre>{@code - * PCollection<Struct> rows = p.apply( - * SpannerIO.read() - * .withInstanceId(instanceId) - * .withDatabaseId(dbId) - * .withQuery("SELECT id, name, email FROM users")); - * }</pre> - * - * <p>To use the Read API, specify a {@link SpannerIO.Read#withTable(String) table name} and - * a {@link SpannerIO.Read#withColumns(List) list of columns}. - * - * <pre>{@code - * PCollection<Struct> rows = p.apply( - * SpannerIO.read() - * .withInstanceId(instanceId) - * .withDatabaseId(dbId) - * .withTable("users") - * .withColumns("id", "name", "email")); - * }</pre> - * - * <p>To optimally read using index, specify the index name using {@link SpannerIO.Read#withIndex}. - * - * <p>The transform is guaranteed to be executed on a consistent snapshot of data, utilizing the - * power of read only transactions. Staleness of data can be controlled using - * {@link SpannerIO.Read#withTimestampBound} or {@link SpannerIO.Read#withTimestamp(Timestamp)} - * methods. <a href="https://cloud.google.com/spanner/docs/transactions">Read more</a> about - * transactions in Cloud Spanner. - * - * <p>It is possible to read several {@link PCollection PCollections} within a single transaction. - * Apply {@link SpannerIO#createTransaction()} transform, that lazily creates a transaction. The - * result of this transformation can be passed to read operation using - * {@link SpannerIO.Read#withTransaction(PCollectionView)}. - * - * <pre>{@code - * SpannerConfig spannerConfig = ... - * - * PCollectionView<Transaction> tx = - * p.apply( - * SpannerIO.createTransaction() - * .withSpannerConfig(spannerConfig) - * .withTimestampBound(TimestampBound.strong())); - * - * PCollection<Struct> users = p.apply( - * SpannerIO.read() - * .withSpannerConfig(spannerConfig) - * .withQuery("SELECT name, email FROM users") - * .withTransaction(tx)); - * - * PCollection<Struct> tweets = p.apply( - * SpannerIO.read() - * .withSpannerConfig(spannerConfig) - * .withQuery("SELECT user, tweet, date FROM tweets") - * .withTransaction(tx)); - * }</pre> + * <p>This functionality is not yet implemented. * * <h3>Writing to Cloud Spanner</h3> * @@ -151,11 +88,6 @@ import org.apache.beam.sdk.values.PDone; * <li>If the pipeline was unexpectedly stopped, mutations that were already applied will not get * rolled back. * </ul> - * - * <p>Use {@link MutationGroup} to ensure that a small set mutations is bundled together. It is - * guaranteed that mutations in a group are submitted in the same transaction. Build - * {@link SpannerIO.Write} transform, and call {@link Write#grouped()} method. It will return a - * transformation that can be applied to a PCollection of MutationGroup. */ @Experimental(Experimental.Kind.SOURCE_SINK) public class SpannerIO { @@ -163,33 +95,6 @@ public class SpannerIO { private static final long DEFAULT_BATCH_SIZE_BYTES = 1024 * 1024; // 1 MB /** - * Creates an uninitialized instance of {@link Read}. Before use, the {@link Read} must be - * configured with a {@link Read#withInstanceId} and {@link Read#withDatabaseId} that identify the - * Cloud Spanner database. - */ - @Experimental(Experimental.Kind.SOURCE_SINK) - public static Read read() { - return new AutoValue_SpannerIO_Read.Builder() - .setSpannerConfig(SpannerConfig.create()) - .setTimestampBound(TimestampBound.strong()) - .setKeySet(KeySet.all()) - .build(); - } - - /** - * Returns a transform that creates a batch transaction. By default, - * {@link TimestampBound#strong()} transaction is created, to override this use - * {@link CreateTransaction#withTimestampBound(TimestampBound)}. - */ - @Experimental - public static CreateTransaction createTransaction() { - return new AutoValue_SpannerIO_CreateTransaction.Builder() - .setSpannerConfig(SpannerConfig.create()) - .setTimestampBound(TimestampBound.strong()) - .build(); - } - - /** * Creates an uninitialized instance of {@link Write}. Before use, the {@link Write} must be * configured with a {@link Write#withInstanceId} and {@link Write#withDatabaseId} that identify * the Cloud Spanner database being written. @@ -197,408 +102,247 @@ public class SpannerIO { @Experimental public static Write write() { return new AutoValue_SpannerIO_Write.Builder() - .setSpannerConfig(SpannerConfig.create()) .setBatchSizeBytes(DEFAULT_BATCH_SIZE_BYTES) .build(); } /** - * A {@link PTransform} that reads data from Google Cloud Spanner. + * A {@link PTransform} that writes {@link Mutation} objects to Google Cloud Spanner. * * @see SpannerIO */ @Experimental(Experimental.Kind.SOURCE_SINK) @AutoValue - public abstract static class Read extends PTransform<PBegin, PCollection<Struct>> { - - abstract SpannerConfig getSpannerConfig(); - - @Nullable - abstract TimestampBound getTimestampBound(); - - @Nullable - abstract Statement getQuery(); + public abstract static class Write extends PTransform<PCollection<Mutation>, PDone> { @Nullable - abstract String getTable(); + abstract String getProjectId(); @Nullable - abstract String getIndex(); + abstract String getInstanceId(); @Nullable - abstract List<String> getColumns(); + abstract String getDatabaseId(); - @Nullable - abstract KeySet getKeySet(); + abstract long getBatchSizeBytes(); @Nullable - abstract PCollectionView<Transaction> getTransaction(); + @VisibleForTesting + abstract ServiceFactory<Spanner, SpannerOptions> getServiceFactory(); abstract Builder toBuilder(); @AutoValue.Builder abstract static class Builder { - abstract Builder setSpannerConfig(SpannerConfig spannerConfig); - - abstract Builder setTimestampBound(TimestampBound timestampBound); - - abstract Builder setQuery(Statement statement); - - abstract Builder setTable(String table); - - abstract Builder setIndex(String index); + abstract Builder setProjectId(String projectId); - abstract Builder setColumns(List<String> columns); + abstract Builder setInstanceId(String instanceId); - abstract Builder setKeySet(KeySet keySet); + abstract Builder setDatabaseId(String databaseId); - abstract Builder setTransaction(PCollectionView<Transaction> transaction); - - abstract Read build(); - } - - /** Specifies the Cloud Spanner configuration. */ - public Read withSpannerConfig(SpannerConfig spannerConfig) { - return toBuilder().setSpannerConfig(spannerConfig).build(); - } + abstract Builder setBatchSizeBytes(long batchSizeBytes); - /** Specifies the Cloud Spanner project. */ - public Read withProjectId(String projectId) { - return withProjectId(ValueProvider.StaticValueProvider.of(projectId)); - } + @VisibleForTesting + abstract Builder setServiceFactory(ServiceFactory<Spanner, SpannerOptions> serviceFactory); - /** Specifies the Cloud Spanner project. */ - public Read withProjectId(ValueProvider<String> projectId) { - SpannerConfig config = getSpannerConfig(); - return withSpannerConfig(config.withProjectId(projectId)); + abstract Write build(); } - /** Specifies the Cloud Spanner instance. */ - public Read withInstanceId(String instanceId) { - return withInstanceId(ValueProvider.StaticValueProvider.of(instanceId)); + /** + * Returns a new {@link SpannerIO.Write} that will write to the specified Cloud Spanner project. + * + * <p>Does not modify this object. + */ + public Write withProjectId(String projectId) { + return toBuilder().setProjectId(projectId).build(); } - /** Specifies the Cloud Spanner instance. */ - public Read withInstanceId(ValueProvider<String> instanceId) { - SpannerConfig config = getSpannerConfig(); - return withSpannerConfig(config.withInstanceId(instanceId)); + /** + * Returns a new {@link SpannerIO.Write} that will write to the specified Cloud Spanner + * instance. + * + * <p>Does not modify this object. + */ + public Write withInstanceId(String instanceId) { + return toBuilder().setInstanceId(instanceId).build(); } - /** Specifies the Cloud Spanner database. */ - public Read withDatabaseId(String databaseId) { - return withDatabaseId(ValueProvider.StaticValueProvider.of(databaseId)); + /** + * Returns a new {@link SpannerIO.Write} with a new batch size limit. + * + * <p>Does not modify this object. + */ + public Write withBatchSizeBytes(long batchSizeBytes) { + return toBuilder().setBatchSizeBytes(batchSizeBytes).build(); } - /** Specifies the Cloud Spanner database. */ - public Read withDatabaseId(ValueProvider<String> databaseId) { - SpannerConfig config = getSpannerConfig(); - return withSpannerConfig(config.withDatabaseId(databaseId)); + /** + * Returns a new {@link SpannerIO.Write} that will write to the specified Cloud Spanner + * database. + * + * <p>Does not modify this object. + */ + public Write withDatabaseId(String databaseId) { + return toBuilder().setDatabaseId(databaseId).build(); } @VisibleForTesting - Read withServiceFactory(ServiceFactory<Spanner, SpannerOptions> serviceFactory) { - SpannerConfig config = getSpannerConfig(); - return withSpannerConfig(config.withServiceFactory(serviceFactory)); - } - - public Read withTransaction(PCollectionView<Transaction> transaction) { - return toBuilder().setTransaction(transaction).build(); - } - - public Read withTimestamp(Timestamp timestamp) { - return withTimestampBound(TimestampBound.ofReadTimestamp(timestamp)); - } - - public Read withTimestampBound(TimestampBound timestampBound) { - return toBuilder().setTimestampBound(timestampBound).build(); - } - - public Read withTable(String table) { - return toBuilder().setTable(table).build(); - } - - public Read withColumns(String... columns) { - return withColumns(Arrays.asList(columns)); - } - - public Read withColumns(List<String> columns) { - return toBuilder().setColumns(columns).build(); - } - - public Read withQuery(Statement statement) { - return toBuilder().setQuery(statement).build(); - } - - public Read withQuery(String sql) { - return withQuery(Statement.of(sql)); - } - - public Read withKeySet(KeySet keySet) { - return toBuilder().setKeySet(keySet).build(); - } - - public Read withIndex(String index) { - return toBuilder().setIndex(index).build(); + Write withServiceFactory(ServiceFactory<Spanner, SpannerOptions> serviceFactory) { + return toBuilder().setServiceFactory(serviceFactory).build(); } - @Override public void validate(PipelineOptions options) { - getSpannerConfig().validate(options); checkNotNull( - getTimestampBound(), - "SpannerIO.read() runs in a read only transaction and requires timestamp to be set " - + "with withTimestampBound or withTimestamp method"); - - if (getQuery() != null) { - // TODO: validate query? - } else if (getTable() != null) { - // Assume read - checkNotNull( - getColumns(), - "For a read operation SpannerIO.read() requires a list of " - + "columns to set with withColumns method"); - checkArgument( - !getColumns().isEmpty(), - "For a read operation SpannerIO.read() requires a" - + " list of columns to set with withColumns method"); - } else { - throw new IllegalArgumentException( - "SpannerIO.read() requires configuring query or read operation."); - } + getInstanceId(), + "SpannerIO.write() requires instance id to be set with withInstanceId method"); + checkNotNull( + getDatabaseId(), + "SpannerIO.write() requires database id to be set with withDatabaseId method"); } @Override - public PCollection<Struct> expand(PBegin input) { - Read config = this; - List<PCollectionView<Transaction>> sideInputs = Collections.emptyList(); - if (getTimestampBound() != null) { - PCollectionView<Transaction> transaction = - input.apply(createTransaction().withSpannerConfig(getSpannerConfig())); - config = config.withTransaction(transaction); - sideInputs = Collections.singletonList(transaction); - } - return input - .apply(Create.of(1)) - .apply( - "Execute query", ParDo.of(new NaiveSpannerReadFn(config)).withSideInputs(sideInputs)); - } - } - - /** - * A {@link PTransform} that create a transaction. - * - * @see SpannerIO - */ - @Experimental(Experimental.Kind.SOURCE_SINK) - @AutoValue - public abstract static class CreateTransaction - extends PTransform<PBegin, PCollectionView<Transaction>> { - - abstract SpannerConfig getSpannerConfig(); - - @Nullable - abstract TimestampBound getTimestampBound(); - - abstract Builder toBuilder(); - - @Override - public PCollectionView<Transaction> expand(PBegin input) { - return input.apply(Create.of(1)) - .apply("Create transaction", ParDo.of(new CreateTransactionFn(this))) - .apply("As PCollectionView", View.<Transaction>asSingleton()); - } - - /** Specifies the Cloud Spanner configuration. */ - public CreateTransaction withSpannerConfig(SpannerConfig spannerConfig) { - return toBuilder().setSpannerConfig(spannerConfig).build(); - } - - /** Specifies the Cloud Spanner project. */ - public CreateTransaction withProjectId(String projectId) { - return withProjectId(ValueProvider.StaticValueProvider.of(projectId)); - } - - /** Specifies the Cloud Spanner project. */ - public CreateTransaction withProjectId(ValueProvider<String> projectId) { - SpannerConfig config = getSpannerConfig(); - return withSpannerConfig(config.withProjectId(projectId)); - } - - /** Specifies the Cloud Spanner instance. */ - public CreateTransaction withInstanceId(String instanceId) { - return withInstanceId(ValueProvider.StaticValueProvider.of(instanceId)); - } - - /** Specifies the Cloud Spanner instance. */ - public CreateTransaction withInstanceId(ValueProvider<String> instanceId) { - SpannerConfig config = getSpannerConfig(); - return withSpannerConfig(config.withInstanceId(instanceId)); - } - - /** Specifies the Cloud Spanner database. */ - public CreateTransaction withDatabaseId(String databaseId) { - return withDatabaseId(ValueProvider.StaticValueProvider.of(databaseId)); - } - - /** Specifies the Cloud Spanner database. */ - public CreateTransaction withDatabaseId(ValueProvider<String> databaseId) { - SpannerConfig config = getSpannerConfig(); - return withSpannerConfig(config.withDatabaseId(databaseId)); - } - - @VisibleForTesting - CreateTransaction withServiceFactory( - ServiceFactory<Spanner, SpannerOptions> serviceFactory) { - SpannerConfig config = getSpannerConfig(); - return withSpannerConfig(config.withServiceFactory(serviceFactory)); - } - - public CreateTransaction withTimestampBound(TimestampBound timestampBound) { - return toBuilder().setTimestampBound(timestampBound).build(); + public PDone expand(PCollection<Mutation> input) { + input.apply("Write mutations to Cloud Spanner", ParDo.of(new SpannerWriteFn(this))); + return PDone.in(input.getPipeline()); } @Override - public void validate(PipelineOptions options) { - getSpannerConfig().validate(options); - } - - /** A builder for {@link CreateTransaction}. */ - @AutoValue.Builder public abstract static class Builder { - - public abstract Builder setSpannerConfig(SpannerConfig spannerConfig); - - public abstract Builder setTimestampBound(TimestampBound newTimestampBound); - - public abstract CreateTransaction build(); + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder + .addIfNotNull(DisplayData.item("projectId", getProjectId()).withLabel("Output Project")) + .addIfNotNull( + DisplayData.item("instanceId", getInstanceId()).withLabel("Output Instance")) + .addIfNotNull( + DisplayData.item("databaseId", getDatabaseId()).withLabel("Output Database")) + .add(DisplayData.item("batchSizeBytes", getBatchSizeBytes()) + .withLabel("Batch Size in Bytes")); + if (getServiceFactory() != null) { + builder.addIfNotNull( + DisplayData.item("serviceFactory", getServiceFactory().getClass().getName()) + .withLabel("Service Factory")); + } } } + /** Batches together and writes mutations to Google Cloud Spanner. */ + @VisibleForTesting + static class SpannerWriteFn extends DoFn<Mutation, Void> { + private static final Logger LOG = LoggerFactory.getLogger(SpannerWriteFn.class); + private final Write spec; + private transient Spanner spanner; + private transient DatabaseClient dbClient; + // Current batch of mutations to be written. + private List<Mutation> mutations; + private long batchSizeBytes = 0; + + private static final int MAX_RETRIES = 5; + private static final FluentBackoff BUNDLE_WRITE_BACKOFF = + FluentBackoff.DEFAULT + .withMaxRetries(MAX_RETRIES) + .withInitialBackoff(Duration.standardSeconds(5)); - /** - * A {@link PTransform} that writes {@link Mutation} objects to Google Cloud Spanner. - * - * @see SpannerIO - */ - @Experimental(Experimental.Kind.SOURCE_SINK) - @AutoValue - public abstract static class Write extends PTransform<PCollection<Mutation>, PDone> { - - abstract SpannerConfig getSpannerConfig(); - - abstract long getBatchSizeBytes(); - - abstract Builder toBuilder(); - - @AutoValue.Builder - abstract static class Builder { - - abstract Builder setSpannerConfig(SpannerConfig spannerConfig); - - abstract Builder setBatchSizeBytes(long batchSizeBytes); - - abstract Write build(); - } - - /** Specifies the Cloud Spanner configuration. */ - public Write withSpannerConfig(SpannerConfig spannerConfig) { - return toBuilder().setSpannerConfig(spannerConfig).build(); - } - - /** Specifies the Cloud Spanner project. */ - public Write withProjectId(String projectId) { - return withProjectId(ValueProvider.StaticValueProvider.of(projectId)); + @VisibleForTesting + SpannerWriteFn(Write spec) { + this.spec = spec; } - /** Specifies the Cloud Spanner project. */ - public Write withProjectId(ValueProvider<String> projectId) { - SpannerConfig config = getSpannerConfig(); - return withSpannerConfig(config.withProjectId(projectId)); + @Setup + public void setup() throws Exception { + SpannerOptions spannerOptions = getSpannerOptions(); + spanner = spannerOptions.getService(); + dbClient = spanner.getDatabaseClient( + DatabaseId.of(projectId(), spec.getInstanceId(), spec.getDatabaseId())); + mutations = new ArrayList<>(); + batchSizeBytes = 0; } - /** Specifies the Cloud Spanner instance. */ - public Write withInstanceId(String instanceId) { - return withInstanceId(ValueProvider.StaticValueProvider.of(instanceId)); + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + Mutation m = c.element(); + mutations.add(m); + batchSizeBytes += MutationSizeEstimator.sizeOf(m); + if (batchSizeBytes >= spec.getBatchSizeBytes()) { + flushBatch(); + } } - /** Specifies the Cloud Spanner instance. */ - public Write withInstanceId(ValueProvider<String> instanceId) { - SpannerConfig config = getSpannerConfig(); - return withSpannerConfig(config.withInstanceId(instanceId)); + private String projectId() { + return spec.getProjectId() == null + ? ServiceOptions.getDefaultProjectId() + : spec.getProjectId(); } - /** Specifies the Cloud Spanner database. */ - public Write withDatabaseId(String databaseId) { - return withDatabaseId(ValueProvider.StaticValueProvider.of(databaseId)); + @FinishBundle + public void finishBundle() throws Exception { + if (!mutations.isEmpty()) { + flushBatch(); + } } - /** Specifies the Cloud Spanner database. */ - public Write withDatabaseId(ValueProvider<String> databaseId) { - SpannerConfig config = getSpannerConfig(); - return withSpannerConfig(config.withDatabaseId(databaseId)); + @Teardown + public void teardown() throws Exception { + if (spanner == null) { + return; + } + spanner.closeAsync().get(); + spanner = null; } - @VisibleForTesting - Write withServiceFactory(ServiceFactory<Spanner, SpannerOptions> serviceFactory) { - SpannerConfig config = getSpannerConfig(); - return withSpannerConfig(config.withServiceFactory(serviceFactory)); + private SpannerOptions getSpannerOptions() { + SpannerOptions.Builder spannerOptionsBuider = SpannerOptions.newBuilder(); + if (spec.getServiceFactory() != null) { + spannerOptionsBuider.setServiceFactory(spec.getServiceFactory()); + } + if (spec.getProjectId() != null) { + spannerOptionsBuider.setProjectId(spec.getProjectId()); + } + return spannerOptionsBuider.build(); } /** - * Same transform but can be applied to {@link PCollection} of {@link MutationGroup}. + * Writes a batch of mutations to Cloud Spanner. + * + * <p>If a commit fails, it will be retried up to {@link #MAX_RETRIES} times. If the retry limit + * is exceeded, the last exception from Cloud Spanner will be thrown. + * + * @throws AbortedException if the commit fails or IOException or InterruptedException if + * backing off between retries fails. */ - public WriteGrouped grouped() { - return new WriteGrouped(this); - } - - /** Specifies the batch size limit. */ - public Write withBatchSizeBytes(long batchSizeBytes) { - return toBuilder().setBatchSizeBytes(batchSizeBytes).build(); - } - - @Override - public void validate(PipelineOptions options) { - getSpannerConfig().validate(options); - } - - @Override - public PDone expand(PCollection<Mutation> input) { - input - .apply("To mutation group", ParDo.of(new ToMutationGroupFn())) - .apply("Write mutations to Cloud Spanner", ParDo.of(new SpannerWriteGroupFn(this))); - return PDone.in(input.getPipeline()); + private void flushBatch() throws AbortedException, IOException, InterruptedException { + LOG.debug("Writing batch of {} mutations", mutations.size()); + Sleeper sleeper = Sleeper.DEFAULT; + BackOff backoff = BUNDLE_WRITE_BACKOFF.backoff(); + + while (true) { + // Batch upsert rows. + try { + dbClient.writeAtLeastOnce(mutations); + + // Break if the commit threw no exception. + break; + } catch (AbortedException exception) { + // Only log the code and message for potentially-transient errors. The entire exception + // will be propagated upon the last retry. + LOG.error( + "Error writing to Spanner ({}): {}", exception.getCode(), exception.getMessage()); + if (!BackOffUtils.next(sleeper, backoff)) { + LOG.error("Aborting after {} retries.", MAX_RETRIES); + throw exception; + } + } + } + LOG.debug("Successfully wrote {} mutations", mutations.size()); + mutations = new ArrayList<>(); + batchSizeBytes = 0; } - @Override - public void populateDisplayData(DisplayData.Builder builder) { + public void populateDisplayData(Builder builder) { super.populateDisplayData(builder); - getSpannerConfig().populateDisplayData(builder); - builder.add( - DisplayData.item("batchSizeBytes", getBatchSizeBytes()).withLabel("Batch Size in Bytes")); - } - } - - /** Same as {@link Write} but supports grouped mutations. */ - public static class WriteGrouped extends PTransform<PCollection<MutationGroup>, PDone> { - private final Write spec; - - public WriteGrouped(Write spec) { - this.spec = spec; - } - - @Override public PDone expand(PCollection<MutationGroup> input) { - input.apply("Write mutations to Cloud Spanner", ParDo.of(new SpannerWriteGroupFn(spec))); - return PDone.in(input.getPipeline()); - } - } - - private static class ToMutationGroupFn extends DoFn<Mutation, MutationGroup> { - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - Mutation value = c.element(); - c.output(MutationGroup.create(value)); + spec.populateDisplayData(builder); } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteGroupFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteGroupFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteGroupFn.java deleted file mode 100644 index 34a11da..0000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteGroupFn.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.spanner; - -import com.google.cloud.spanner.AbortedException; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Iterables; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.util.BackOff; -import org.apache.beam.sdk.util.BackOffUtils; -import org.apache.beam.sdk.util.FluentBackoff; -import org.apache.beam.sdk.util.Sleeper; -import org.joda.time.Duration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** Batches together and writes mutations to Google Cloud Spanner. */ -@VisibleForTesting class SpannerWriteGroupFn extends AbstractSpannerFn<MutationGroup, Void> { - private static final Logger LOG = LoggerFactory.getLogger(SpannerWriteGroupFn.class); - private final SpannerIO.Write spec; - // Current batch of mutations to be written. - private List<MutationGroup> mutations; - private long batchSizeBytes = 0; - - private static final int MAX_RETRIES = 5; - private static final FluentBackoff BUNDLE_WRITE_BACKOFF = - FluentBackoff.DEFAULT - .withMaxRetries(MAX_RETRIES) - .withInitialBackoff(Duration.standardSeconds(5)); - - @VisibleForTesting SpannerWriteGroupFn(SpannerIO.Write spec) { - this.spec = spec; - } - - @Override SpannerConfig getSpannerConfig() { - return spec.getSpannerConfig(); - } - - @Setup - public void setup() throws Exception { - super.setup(); - mutations = new ArrayList<>(); - batchSizeBytes = 0; - } - - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - MutationGroup m = c.element(); - mutations.add(m); - batchSizeBytes += MutationSizeEstimator.sizeOf(m); - if (batchSizeBytes >= spec.getBatchSizeBytes()) { - flushBatch(); - } - } - - @FinishBundle - public void finishBundle() throws Exception { - if (!mutations.isEmpty()) { - flushBatch(); - } - } - - /** - * Writes a batch of mutations to Cloud Spanner. - * - * <p>If a commit fails, it will be retried up to {@link #MAX_RETRIES} times. If the retry limit - * is exceeded, the last exception from Cloud Spanner will be thrown. - * - * @throws AbortedException if the commit fails or IOException or InterruptedException if - * backing off between retries fails. - */ - private void flushBatch() throws AbortedException, IOException, InterruptedException { - LOG.debug("Writing batch of {} mutations", mutations.size()); - Sleeper sleeper = Sleeper.DEFAULT; - BackOff backoff = BUNDLE_WRITE_BACKOFF.backoff(); - - while (true) { - // Batch upsert rows. - try { - databaseClient().writeAtLeastOnce(Iterables.concat(mutations)); - - // Break if the commit threw no exception. - break; - } catch (AbortedException exception) { - // Only log the code and message for potentially-transient errors. The entire exception - // will be propagated upon the last retry. - LOG.error( - "Error writing to Spanner ({}): {}", exception.getCode(), exception.getMessage()); - if (!BackOffUtils.next(sleeper, backoff)) { - LOG.error("Aborting after {} retries.", MAX_RETRIES); - throw exception; - } - } - } - LOG.debug("Successfully wrote {} mutations", mutations.size()); - mutations = new ArrayList<>(); - batchSizeBytes = 0; - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - spec.populateDisplayData(builder); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/Transaction.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/Transaction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/Transaction.java deleted file mode 100644 index 22af3b8..0000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/Transaction.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.spanner; - -import com.google.auto.value.AutoValue; -import com.google.cloud.Timestamp; -import java.io.Serializable; - -/** A transaction object. */ -@AutoValue -public abstract class Transaction implements Serializable { - - abstract Timestamp timestamp(); - - public static Transaction create(Timestamp timestamp) { - return new AutoValue_Transaction(timestamp); - } -}
