Repository: beam Updated Branches: refs/heads/master 996e35c1d -> 646caf255
SpannerIO: Introduced a MutationGroup. Allows to group together mutation in a logical bundle that is submitted in the same transaction. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9115af48 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9115af48 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9115af48 Branch: refs/heads/master Commit: 9115af488ceb907de121313ffa096d58a0ccc1e1 Parents: 996e35c Author: Mairbek Khadikov <[email protected]> Authored: Wed Jun 7 16:27:01 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Tue Jun 13 13:28:25 2017 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/spanner/MutationGroup.java | 67 +++++++++++++++++ .../io/gcp/spanner/MutationSizeEstimator.java | 9 +++ .../beam/sdk/io/gcp/spanner/SpannerIO.java | 53 +++++++++++--- .../gcp/spanner/MutationSizeEstimatorTest.java | 12 ++++ .../beam/sdk/io/gcp/spanner/SpannerIOTest.java | 76 ++++++++++++++++---- 5 files changed, 197 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/9115af48/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroup.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroup.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroup.java new file mode 100644 index 0000000..5b08da2 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroup.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner; + +import com.google.cloud.spanner.Mutation; +import com.google.common.collect.ImmutableList; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +/** + * A bundle of mutations that must be submitted atomically. + * + * <p>One of the mutations is chosen to be "primary", and can be used to determine partitions. + */ +public final class MutationGroup implements Serializable, Iterable<Mutation> { + private final ImmutableList<Mutation> mutations; + + /** + * Creates a new group. + * + * @param primary a primary mutation. + * @param other other mutations, usually interleaved in parent. + * @return new mutation group. + */ + public static MutationGroup create(Mutation primary, Mutation... other) { + return create(primary, Arrays.asList(other)); + } + + public static MutationGroup create(Mutation primary, Iterable<Mutation> other) { + return new MutationGroup(ImmutableList.<Mutation>builder().add(primary).addAll(other).build()); + } + + @Override + public Iterator<Mutation> iterator() { + return mutations.iterator(); + } + + private MutationGroup(ImmutableList<Mutation> mutations) { + this.mutations = mutations; + } + + public Mutation primary() { + return mutations.get(0); + } + + public List<Mutation> attached() { + return mutations.subList(1, mutations.size()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/9115af48/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java index 61652e7..2418816 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java @@ -44,6 +44,15 @@ class MutationSizeEstimator { return result; } + /** Estimates a size of the mutation group in bytes. */ + public static long sizeOf(MutationGroup group) { + long result = 0; + for (Mutation m : group) { + result += sizeOf(m); + } + return result; + } + private static long estimatePrimitiveValue(Value v) { switch (v.getType().getCode()) { case BOOL: http://git-wip-us.apache.org/repos/asf/beam/blob/9115af48/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java index 5058d13..af5253b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -29,10 +29,12 @@ import com.google.cloud.spanner.Mutation; import com.google.cloud.spanner.Spanner; import com.google.cloud.spanner.SpannerOptions; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Iterables; import java.io.IOException; import java.util.ArrayList; import java.util.List; import javax.annotation.Nullable; + import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; @@ -88,6 +90,11 @@ import org.slf4j.LoggerFactory; * <li>If the pipeline was unexpectedly stopped, mutations that were already applied will not get * rolled back. * </ul> + * + * <p>Use {@link MutationGroup} to ensure that a small set mutations is bundled together. It is + * guaranteed that mutations in a group are submitted in the same transaction. Build + * {@link SpannerIO.Write} transform, and call {@link Write#grouped()} method. It will return a + * transformation that can be applied to a PCollection of MutationGroup. */ @Experimental(Experimental.Kind.SOURCE_SINK) public class SpannerIO { @@ -187,6 +194,13 @@ public class SpannerIO { return toBuilder().setDatabaseId(databaseId).build(); } + /** + * Same transform but can be applied to {@link PCollection} of {@link MutationGroup}. + */ + public WriteGrouped grouped() { + return new WriteGrouped(this); + } + @VisibleForTesting Write withServiceFactory(ServiceFactory<Spanner, SpannerOptions> serviceFactory) { return toBuilder().setServiceFactory(serviceFactory).build(); @@ -204,7 +218,9 @@ public class SpannerIO { @Override public PDone expand(PCollection<Mutation> input) { - input.apply("Write mutations to Cloud Spanner", ParDo.of(new SpannerWriteFn(this))); + input + .apply("To mutation group", ParDo.of(new ToMutationGroupFn())) + .apply("Write mutations to Cloud Spanner", ParDo.of(new SpannerWriteGroupFn(this))); return PDone.in(input.getPipeline()); } @@ -227,15 +243,37 @@ public class SpannerIO { } } + /** Same as {@link Write} but supports grouped mutations. */ + public static class WriteGrouped extends PTransform<PCollection<MutationGroup>, PDone> { + private final Write spec; + + public WriteGrouped(Write spec) { + this.spec = spec; + } + + @Override public PDone expand(PCollection<MutationGroup> input) { + input.apply("Write mutations to Cloud Spanner", ParDo.of(new SpannerWriteGroupFn(spec))); + return PDone.in(input.getPipeline()); + } + } + + private static class ToMutationGroupFn extends DoFn<Mutation, MutationGroup> { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + Mutation value = c.element(); + c.output(MutationGroup.create(value)); + } + } + /** Batches together and writes mutations to Google Cloud Spanner. */ @VisibleForTesting - static class SpannerWriteFn extends DoFn<Mutation, Void> { - private static final Logger LOG = LoggerFactory.getLogger(SpannerWriteFn.class); + static class SpannerWriteGroupFn extends DoFn<MutationGroup, Void> { + private static final Logger LOG = LoggerFactory.getLogger(SpannerWriteGroupFn.class); private final Write spec; private transient Spanner spanner; private transient DatabaseClient dbClient; // Current batch of mutations to be written. - private List<Mutation> mutations; + private List<MutationGroup> mutations; private long batchSizeBytes = 0; private static final int MAX_RETRIES = 5; @@ -244,8 +282,7 @@ public class SpannerIO { .withMaxRetries(MAX_RETRIES) .withInitialBackoff(Duration.standardSeconds(5)); - @VisibleForTesting - SpannerWriteFn(Write spec) { + @VisibleForTesting SpannerWriteGroupFn(Write spec) { this.spec = spec; } @@ -261,7 +298,7 @@ public class SpannerIO { @ProcessElement public void processElement(ProcessContext c) throws Exception { - Mutation m = c.element(); + MutationGroup m = c.element(); mutations.add(m); batchSizeBytes += MutationSizeEstimator.sizeOf(m); if (batchSizeBytes >= spec.getBatchSizeBytes()) { @@ -319,7 +356,7 @@ public class SpannerIO { while (true) { // Batch upsert rows. try { - dbClient.writeAtLeastOnce(mutations); + dbClient.writeAtLeastOnce(Iterables.concat(mutations)); // Break if the commit threw no exception. break; http://git-wip-us.apache.org/repos/asf/beam/blob/9115af48/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimatorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimatorTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimatorTest.java index 03eb28e..013b83d 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimatorTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimatorTest.java @@ -135,4 +135,16 @@ public class MutationSizeEstimatorTest { assertThat(MutationSizeEstimator.sizeOf(timestampArray), is(24L)); assertThat(MutationSizeEstimator.sizeOf(dateArray), is(48L)); } + + @Test + public void group() throws Exception { + Mutation int64 = Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build(); + Mutation float64 = Mutation.newInsertOrUpdateBuilder("test").set("one").to(2.9).build(); + Mutation bool = Mutation.newInsertOrUpdateBuilder("test").set("one").to(false).build(); + + MutationGroup group = MutationGroup.create(int64, float64, bool); + + assertThat(MutationSizeEstimator.sizeOf(group), is(17L)); + } + } http://git-wip-us.apache.org/repos/asf/beam/blob/9115af48/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java index 5bdfea5..4a759fb 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java @@ -114,9 +114,31 @@ public class SpannerIOTest implements Serializable { } @Test - public void batching() throws Exception { + @Category(NeedsRunner.class) + public void singleMutationGroupPipeline() throws Exception { Mutation one = Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build(); Mutation two = Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build(); + Mutation three = Mutation.newInsertOrUpdateBuilder("test").set("three").to(3).build(); + PCollection<MutationGroup> mutations = pipeline + .apply(Create.<MutationGroup>of(g(one, two, three))); + mutations.apply( + SpannerIO.write() + .withProjectId("test-project") + .withInstanceId("test-instance") + .withDatabaseId("test-database") + .withServiceFactory(serviceFactory) + .grouped()); + pipeline.run(); + verify(serviceFactory.mockSpanner()) + .getDatabaseClient(DatabaseId.of("test-project", "test-instance", "test-database")); + verify(serviceFactory.mockDatabaseClient(), times(1)) + .writeAtLeastOnce(argThat(new IterableOfSize(3))); + } + + @Test + public void batching() throws Exception { + MutationGroup one = g(Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build()); + MutationGroup two = g(Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build()); SpannerIO.Write write = SpannerIO.write() .withProjectId("test-project") @@ -124,8 +146,8 @@ public class SpannerIOTest implements Serializable { .withDatabaseId("test-database") .withBatchSizeBytes(1000000000) .withServiceFactory(serviceFactory); - SpannerIO.SpannerWriteFn writerFn = new SpannerIO.SpannerWriteFn(write); - DoFnTester<Mutation, Void> fnTester = DoFnTester.of(writerFn); + SpannerIO.SpannerWriteGroupFn writerFn = new SpannerIO.SpannerWriteGroupFn(write); + DoFnTester<MutationGroup, Void> fnTester = DoFnTester.of(writerFn); fnTester.processBundle(Arrays.asList(one, two)); verify(serviceFactory.mockSpanner()) @@ -136,9 +158,9 @@ public class SpannerIOTest implements Serializable { @Test public void batchingGroups() throws Exception { - Mutation one = Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build(); - Mutation two = Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build(); - Mutation three = Mutation.newInsertOrUpdateBuilder("test").set("three").to(3).build(); + MutationGroup one = g(Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build()); + MutationGroup two = g(Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build()); + MutationGroup three = g(Mutation.newInsertOrUpdateBuilder("test").set("three").to(3).build()); // Have a room to accumulate one more item. long batchSize = MutationSizeEstimator.sizeOf(one) + 1; @@ -150,8 +172,8 @@ public class SpannerIOTest implements Serializable { .withDatabaseId("test-database") .withBatchSizeBytes(batchSize) .withServiceFactory(serviceFactory); - SpannerIO.SpannerWriteFn writerFn = new SpannerIO.SpannerWriteFn(write); - DoFnTester<Mutation, Void> fnTester = DoFnTester.of(writerFn); + SpannerIO.SpannerWriteGroupFn writerFn = new SpannerIO.SpannerWriteGroupFn(write); + DoFnTester<MutationGroup, Void> fnTester = DoFnTester.of(writerFn); fnTester.processBundle(Arrays.asList(one, two, three)); verify(serviceFactory.mockSpanner()) @@ -164,8 +186,8 @@ public class SpannerIOTest implements Serializable { @Test public void noBatching() throws Exception { - Mutation one = Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build(); - Mutation two = Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build(); + MutationGroup one = g(Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build()); + MutationGroup two = g(Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build()); SpannerIO.Write write = SpannerIO.write() .withProjectId("test-project") @@ -173,8 +195,8 @@ public class SpannerIOTest implements Serializable { .withDatabaseId("test-database") .withBatchSizeBytes(0) // turn off batching. .withServiceFactory(serviceFactory); - SpannerIO.SpannerWriteFn writerFn = new SpannerIO.SpannerWriteFn(write); - DoFnTester<Mutation, Void> fnTester = DoFnTester.of(writerFn); + SpannerIO.SpannerWriteGroupFn writerFn = new SpannerIO.SpannerWriteGroupFn(write); + DoFnTester<MutationGroup, Void> fnTester = DoFnTester.of(writerFn); fnTester.processBundle(Arrays.asList(one, two)); verify(serviceFactory.mockSpanner()) @@ -183,6 +205,32 @@ public class SpannerIOTest implements Serializable { .writeAtLeastOnce(argThat(new IterableOfSize(1))); } + @Test + public void groups() throws Exception { + Mutation one = Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build(); + Mutation two = Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build(); + Mutation three = Mutation.newInsertOrUpdateBuilder("test").set("three").to(3).build(); + + // Smallest batch size + long batchSize = 1; + + SpannerIO.Write write = + SpannerIO.write() + .withProjectId("test-project") + .withInstanceId("test-instance") + .withDatabaseId("test-database") + .withBatchSizeBytes(batchSize) + .withServiceFactory(serviceFactory); + SpannerIO.SpannerWriteGroupFn writerFn = new SpannerIO.SpannerWriteGroupFn(write); + DoFnTester<MutationGroup, Void> fnTester = DoFnTester.of(writerFn); + fnTester.processBundle(Arrays.asList(g(one, two, three))); + + verify(serviceFactory.mockSpanner()) + .getDatabaseClient(DatabaseId.of("test-project", "test-instance", "test-database")); + verify(serviceFactory.mockDatabaseClient(), times(1)) + .writeAtLeastOnce(argThat(new IterableOfSize(3))); + } + private static class FakeServiceFactory implements ServiceFactory<Spanner, SpannerOptions>, Serializable { // Marked as static so they could be returned by serviceFactory, which is serializable. @@ -241,4 +289,8 @@ public class SpannerIOTest implements Serializable { return argument instanceof Iterable && Iterables.size((Iterable<?>) argument) == size; } } + + private static MutationGroup g(Mutation m, Mutation... other) { + return MutationGroup.create(m, other); + } }
