Repository: beam Updated Branches: refs/heads/master c8e3744ad -> afeba3715
Introduces SpannerIO.readAll() Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/95e9c28c Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/95e9c28c Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/95e9c28c Branch: refs/heads/master Commit: 95e9c28ca4da5bac31f3d768595693e43b464c1c Parents: c8e3744 Author: Mairbek Khadikov <[email protected]> Authored: Tue Jul 18 16:23:58 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Thu Jul 20 10:58:51 2017 -0700 ---------------------------------------------------------------------- .../sdk/io/gcp/spanner/NaiveSpannerReadFn.java | 35 ++-- .../beam/sdk/io/gcp/spanner/ReadOperation.java | 96 ++++++++++ .../beam/sdk/io/gcp/spanner/SpannerIO.java | 187 ++++++++++++++----- .../sdk/io/gcp/spanner/SpannerIOReadTest.java | 145 +++++++++----- 4 files changed, 353 insertions(+), 110 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/95e9c28c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java index d193b95..92b3fe3 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java @@ -22,44 +22,53 @@ import com.google.cloud.spanner.ResultSet; import com.google.cloud.spanner.Struct; import com.google.cloud.spanner.TimestampBound; import com.google.common.annotations.VisibleForTesting; +import javax.annotation.Nullable; +import org.apache.beam.sdk.values.PCollectionView; /** A simplest read function implementation. Parallelism support is coming. */ @VisibleForTesting -class NaiveSpannerReadFn extends AbstractSpannerFn<Object, Struct> { - private final SpannerIO.Read config; +class NaiveSpannerReadFn extends AbstractSpannerFn<ReadOperation, Struct> { + private final SpannerConfig config; + @Nullable private final PCollectionView<Transaction> transaction; - NaiveSpannerReadFn(SpannerIO.Read config) { + NaiveSpannerReadFn(SpannerConfig config, @Nullable PCollectionView<Transaction> transaction) { this.config = config; + this.transaction = transaction; + } + + NaiveSpannerReadFn(SpannerConfig config) { + this(config, null); } SpannerConfig getSpannerConfig() { - return config.getSpannerConfig(); + return config; } @ProcessElement public void processElement(ProcessContext c) throws Exception { TimestampBound timestampBound = TimestampBound.strong(); - if (config.getTransaction() != null) { - Transaction transaction = c.sideInput(config.getTransaction()); + if (transaction != null) { + Transaction transaction = c.sideInput(this.transaction); timestampBound = TimestampBound.ofReadTimestamp(transaction.timestamp()); } + ReadOperation op = c.element(); try (ReadOnlyTransaction readOnlyTransaction = databaseClient().readOnlyTransaction(timestampBound)) { - ResultSet resultSet = execute(readOnlyTransaction); + ResultSet resultSet = execute(op, readOnlyTransaction); while (resultSet.next()) { c.output(resultSet.getCurrentRowAsStruct()); } } } - private ResultSet execute(ReadOnlyTransaction readOnlyTransaction) { - if (config.getQuery() != null) { - return readOnlyTransaction.executeQuery(config.getQuery()); + private ResultSet execute(ReadOperation op, ReadOnlyTransaction readOnlyTransaction) { + if (op.getQuery() != null) { + return readOnlyTransaction.executeQuery(op.getQuery()); } - if (config.getIndex() != null) { + if (op.getIndex() != null) { return readOnlyTransaction.readUsingIndex( - config.getTable(), config.getIndex(), config.getKeySet(), config.getColumns()); + op.getTable(), op.getIndex(), op.getKeySet(), op.getColumns()); } - return readOnlyTransaction.read(config.getTable(), config.getKeySet(), config.getColumns()); + return readOnlyTransaction.read(op.getTable(), op.getKeySet(), op.getColumns()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/95e9c28c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadOperation.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadOperation.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadOperation.java new file mode 100644 index 0000000..3b2bb6b --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadOperation.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner; + +import com.google.auto.value.AutoValue; +import com.google.cloud.spanner.KeySet; +import com.google.cloud.spanner.Statement; +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import javax.annotation.Nullable; + +/** Encapsulates a spanner read operation. */ +@AutoValue +public abstract class ReadOperation implements Serializable { + + public static ReadOperation create() { + return new AutoValue_ReadOperation.Builder().setKeySet(KeySet.all()).build(); + } + + @Nullable + public abstract Statement getQuery(); + + @Nullable + public abstract String getTable(); + + @Nullable + public abstract String getIndex(); + + @Nullable + public abstract List<String> getColumns(); + + @Nullable + public abstract KeySet getKeySet(); + + @AutoValue.Builder + abstract static class Builder { + + abstract Builder setQuery(Statement statement); + + abstract Builder setTable(String table); + + abstract Builder setIndex(String index); + + abstract Builder setColumns(List<String> columns); + + abstract Builder setKeySet(KeySet keySet); + + abstract ReadOperation build(); + } + + abstract Builder toBuilder(); + + public ReadOperation withTable(String table) { + return toBuilder().setTable(table).build(); + } + + public ReadOperation withColumns(String... columns) { + return withColumns(Arrays.asList(columns)); + } + + public ReadOperation withColumns(List<String> columns) { + return toBuilder().setColumns(columns).build(); + } + + public ReadOperation withQuery(Statement statement) { + return toBuilder().setQuery(statement).build(); + } + + public ReadOperation withQuery(String sql) { + return withQuery(Statement.of(sql)); + } + + public ReadOperation withKeySet(KeySet keySet) { + return toBuilder().setKeySet(keySet).build(); + } + + public ReadOperation withIndex(String index) { + return toBuilder().setIndex(index).build(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/95e9c28c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java index a247d4c..e5c9c05 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -31,12 +31,11 @@ import com.google.cloud.spanner.Statement; import com.google.cloud.spanner.Struct; import com.google.cloud.spanner.TimestampBound; import com.google.common.annotations.VisibleForTesting; - import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.UUID; import javax.annotation.Nullable; - import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; @@ -44,7 +43,11 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Reshuffle; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -172,7 +175,18 @@ public class SpannerIO { return new AutoValue_SpannerIO_Read.Builder() .setSpannerConfig(SpannerConfig.create()) .setTimestampBound(TimestampBound.strong()) - .setKeySet(KeySet.all()) + .setReadOperation(ReadOperation.create()) + .build(); + } + + /** + * A {@link PTransform} that works like {@link #read}, but executes read operations coming from a + * {@link PCollection}. + */ + @Experimental(Experimental.Kind.SOURCE_SINK) + public static ReadAll readAll() { + return new AutoValue_SpannerIO_ReadAll.Builder() + .setSpannerConfig(SpannerConfig.create()) .build(); } @@ -202,34 +216,113 @@ public class SpannerIO { .build(); } - /** - * A {@link PTransform} that reads data from Google Cloud Spanner. - * - * @see SpannerIO - */ + /** Implementation of {@link #readAll}. */ @Experimental(Experimental.Kind.SOURCE_SINK) @AutoValue - public abstract static class Read extends PTransform<PBegin, PCollection<Struct>> { + public abstract static class ReadAll + extends PTransform<PCollection<ReadOperation>, PCollection<Struct>> { abstract SpannerConfig getSpannerConfig(); @Nullable - abstract TimestampBound getTimestampBound(); + abstract PCollectionView<Transaction> getTransaction(); - @Nullable - abstract Statement getQuery(); + abstract Builder toBuilder(); - @Nullable - abstract String getTable(); + @AutoValue.Builder + abstract static class Builder { + abstract Builder setSpannerConfig(SpannerConfig spannerConfig); - @Nullable - abstract String getIndex(); + abstract Builder setTransaction(PCollectionView<Transaction> transaction); - @Nullable - abstract List<String> getColumns(); + abstract ReadAll build(); + } + + /** Specifies the Cloud Spanner configuration. */ + public ReadAll withSpannerConfig(SpannerConfig spannerConfig) { + return toBuilder().setSpannerConfig(spannerConfig).build(); + } + + /** Specifies the Cloud Spanner project. */ + public ReadAll withProjectId(String projectId) { + return withProjectId(ValueProvider.StaticValueProvider.of(projectId)); + } + + /** Specifies the Cloud Spanner project. */ + public ReadAll withProjectId(ValueProvider<String> projectId) { + SpannerConfig config = getSpannerConfig(); + return withSpannerConfig(config.withProjectId(projectId)); + } + + /** Specifies the Cloud Spanner instance. */ + public ReadAll withInstanceId(String instanceId) { + return withInstanceId(ValueProvider.StaticValueProvider.of(instanceId)); + } + + /** Specifies the Cloud Spanner instance. */ + public ReadAll withInstanceId(ValueProvider<String> instanceId) { + SpannerConfig config = getSpannerConfig(); + return withSpannerConfig(config.withInstanceId(instanceId)); + } + + /** Specifies the Cloud Spanner database. */ + public ReadAll withDatabaseId(String databaseId) { + return withDatabaseId(ValueProvider.StaticValueProvider.of(databaseId)); + } + + /** Specifies the Cloud Spanner database. */ + public ReadAll withDatabaseId(ValueProvider<String> databaseId) { + SpannerConfig config = getSpannerConfig(); + return withSpannerConfig(config.withDatabaseId(databaseId)); + } + + @VisibleForTesting + ReadAll withServiceFactory(ServiceFactory<Spanner, SpannerOptions> serviceFactory) { + SpannerConfig config = getSpannerConfig(); + return withSpannerConfig(config.withServiceFactory(serviceFactory)); + } + + public ReadAll withTransaction(PCollectionView<Transaction> transaction) { + return toBuilder().setTransaction(transaction).build(); + } + + @Override + public PCollection<Struct> expand(PCollection<ReadOperation> input) { + PCollection<ReadOperation> reshuffled = + input + .apply( + "Pair with random key", + WithKeys.of( + new SerializableFunction<ReadOperation, String>() { + @Override + public String apply(ReadOperation ignored) { + return UUID.randomUUID().toString(); + } + })) + .apply("Reshuffle", Reshuffle.<String, ReadOperation>of()) + .apply("Strip keys", Values.<ReadOperation>create()); + List<PCollectionView<Transaction>> sideInputs = + getTransaction() == null + ? Collections.<PCollectionView<Transaction>>emptyList() + : Collections.singletonList(getTransaction()); + return reshuffled.apply( + "Execute queries", + ParDo.of(new NaiveSpannerReadFn(getSpannerConfig(), getTransaction())) + .withSideInputs(sideInputs)); + } + } + + /** Implementation of {@link #read}. */ + @Experimental(Experimental.Kind.SOURCE_SINK) + @AutoValue + public abstract static class Read extends PTransform<PBegin, PCollection<Struct>> { + + abstract SpannerConfig getSpannerConfig(); + + abstract ReadOperation getReadOperation(); @Nullable - abstract KeySet getKeySet(); + abstract TimestampBound getTimestampBound(); @Nullable abstract PCollectionView<Transaction> getTransaction(); @@ -241,17 +334,9 @@ public class SpannerIO { abstract Builder setSpannerConfig(SpannerConfig spannerConfig); - abstract Builder setTimestampBound(TimestampBound timestampBound); - - abstract Builder setQuery(Statement statement); - - abstract Builder setTable(String table); - - abstract Builder setIndex(String index); + abstract Builder setReadOperation(ReadOperation readOperation); - abstract Builder setColumns(List<String> columns); - - abstract Builder setKeySet(KeySet keySet); + abstract Builder setTimestampBound(TimestampBound timestampBound); abstract Builder setTransaction(PCollectionView<Transaction> transaction); @@ -315,7 +400,11 @@ public class SpannerIO { } public Read withTable(String table) { - return toBuilder().setTable(table).build(); + return withReadOperation(getReadOperation().withTable(table)); + } + + public Read withReadOperation(ReadOperation operation) { + return toBuilder().setReadOperation(operation).build(); } public Read withColumns(String... columns) { @@ -323,11 +412,11 @@ public class SpannerIO { } public Read withColumns(List<String> columns) { - return toBuilder().setColumns(columns).build(); + return withReadOperation(getReadOperation().withColumns(columns)); } public Read withQuery(Statement statement) { - return toBuilder().setQuery(statement).build(); + return withReadOperation(getReadOperation().withQuery(statement)); } public Read withQuery(String sql) { @@ -335,14 +424,13 @@ public class SpannerIO { } public Read withKeySet(KeySet keySet) { - return toBuilder().setKeySet(keySet).build(); + return withReadOperation(getReadOperation().withKeySet(keySet)); } public Read withIndex(String index) { - return toBuilder().setIndex(index).build(); + return withReadOperation(getReadOperation().withIndex(index)); } - @Override public void validate(PipelineOptions options) { getSpannerConfig().validate(options); @@ -351,16 +439,16 @@ public class SpannerIO { "SpannerIO.read() runs in a read only transaction and requires timestamp to be set " + "with withTimestampBound or withTimestamp method"); - if (getQuery() != null) { + if (getReadOperation().getQuery() != null) { // TODO: validate query? - } else if (getTable() != null) { + } else if (getReadOperation().getTable() != null) { // Assume read checkNotNull( - getColumns(), + getReadOperation().getColumns(), "For a read operation SpannerIO.read() requires a list of " + "columns to set with withColumns method"); checkArgument( - !getColumns().isEmpty(), + !getReadOperation().getColumns().isEmpty(), "For a read operation SpannerIO.read() requires a" + " list of columns to set with withColumns method"); } else { @@ -371,18 +459,17 @@ public class SpannerIO { @Override public PCollection<Struct> expand(PBegin input) { - Read config = this; - List<PCollectionView<Transaction>> sideInputs = Collections.emptyList(); - if (getTimestampBound() != null) { - PCollectionView<Transaction> transaction = - input.apply(createTransaction().withSpannerConfig(getSpannerConfig())); - config = config.withTransaction(transaction); - sideInputs = Collections.singletonList(transaction); + PCollectionView<Transaction> transaction = getTransaction(); + if (transaction == null && getTimestampBound() != null) { + transaction = + input.apply( + createTransaction() + .withTimestampBound(getTimestampBound()) + .withSpannerConfig(getSpannerConfig())); } - return input - .apply(Create.of(1)) - .apply( - "Execute query", ParDo.of(new NaiveSpannerReadFn(config)).withSideInputs(sideInputs)); + ReadAll readAll = + readAll().withSpannerConfig(getSpannerConfig()).withTransaction(transaction); + return input.apply(Create.of(getReadOperation())).apply("Execute query", readAll); } } http://git-wip-us.apache.org/repos/asf/beam/blob/95e9c28c/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java index 5ba2da0..6eb1a33 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java @@ -39,6 +39,7 @@ import java.util.List; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFnTester; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; @@ -55,6 +56,7 @@ import org.mockito.Mockito; /** Unit tests for {@link SpannerIO}. */ @RunWith(JUnit4.class) public class SpannerIOReadTest implements Serializable { + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); @Rule @@ -63,12 +65,16 @@ public class SpannerIOReadTest implements Serializable { private FakeServiceFactory serviceFactory; private ReadOnlyTransaction mockTx; - private Type fakeType = Type.struct(Type.StructField.of("id", Type.int64()), - Type.StructField.of("name", Type.string())); + private static final Type FAKE_TYPE = + Type.struct( + Type.StructField.of("id", Type.int64()), Type.StructField.of("name", Type.string())); - private List<Struct> fakeRows = Arrays.asList( - Struct.newBuilder().add("id", Value.int64(1)).add("name", Value.string("Alice")).build(), - Struct.newBuilder().add("id", Value.int64(2)).add("name", Value.string("Bob")).build()); + private static final List<Struct> FAKE_ROWS = + Arrays.asList( + Struct.newBuilder().add("id", Value.int64(1)).add("name", Value.string("Alice")).build(), + Struct.newBuilder().add("id", Value.int64(2)).add("name", Value.string("Bob")).build(), + Struct.newBuilder().add("id", Value.int64(3)).add("name", Value.string("Carl")).build(), + Struct.newBuilder().add("id", Value.int64(4)).add("name", Value.string("Dan")).build()); @Before @SuppressWarnings("unchecked") @@ -153,20 +159,19 @@ public class SpannerIOReadTest implements Serializable { .withProjectId("test") .withInstanceId("123") .withDatabaseId("aaa") - .withTimestamp(Timestamp.now()) .withQuery("SELECT * FROM users") .withServiceFactory(serviceFactory); - NaiveSpannerReadFn readFn = new NaiveSpannerReadFn(read); - DoFnTester<Object, Struct> fnTester = DoFnTester.of(readFn); + NaiveSpannerReadFn readFn = new NaiveSpannerReadFn(read.getSpannerConfig()); + DoFnTester<ReadOperation, Struct> fnTester = DoFnTester.of(readFn); when(serviceFactory.mockDatabaseClient().readOnlyTransaction(any(TimestampBound.class))) .thenReturn(mockTx); when(mockTx.executeQuery(any(Statement.class))) - .thenReturn(ResultSets.forRows(fakeType, fakeRows)); + .thenReturn(ResultSets.forRows(FAKE_TYPE, FAKE_ROWS)); - List<Struct> result = fnTester.processBundle(1); - assertThat(result, Matchers.<Struct>iterableWithSize(2)); + List<Struct> result = fnTester.processBundle(read.getReadOperation()); + assertThat(result, Matchers.containsInAnyOrder(FAKE_ROWS.toArray())); verify(serviceFactory.mockDatabaseClient()).readOnlyTransaction(TimestampBound .strong()); @@ -180,21 +185,20 @@ public class SpannerIOReadTest implements Serializable { .withProjectId("test") .withInstanceId("123") .withDatabaseId("aaa") - .withTimestamp(Timestamp.now()) .withTable("users") .withColumns("id", "name") .withServiceFactory(serviceFactory); - NaiveSpannerReadFn readFn = new NaiveSpannerReadFn(read); - DoFnTester<Object, Struct> fnTester = DoFnTester.of(readFn); + NaiveSpannerReadFn readFn = new NaiveSpannerReadFn(read.getSpannerConfig()); + DoFnTester<ReadOperation, Struct> fnTester = DoFnTester.of(readFn); when(serviceFactory.mockDatabaseClient().readOnlyTransaction(any(TimestampBound.class))) .thenReturn(mockTx); when(mockTx.read("users", KeySet.all(), Arrays.asList("id", "name"))) - .thenReturn(ResultSets.forRows(fakeType, fakeRows)); + .thenReturn(ResultSets.forRows(FAKE_TYPE, FAKE_ROWS)); - List<Struct> result = fnTester.processBundle(1); - assertThat(result, Matchers.<Struct>iterableWithSize(2)); + List<Struct> result = fnTester.processBundle(read.getReadOperation()); + assertThat(result, Matchers.containsInAnyOrder(FAKE_ROWS.toArray())); verify(serviceFactory.mockDatabaseClient()).readOnlyTransaction(TimestampBound.strong()); verify(mockTx).read("users", KeySet.all(), Arrays.asList("id", "name")); @@ -213,16 +217,16 @@ public class SpannerIOReadTest implements Serializable { .withIndex("theindex") .withServiceFactory(serviceFactory); - NaiveSpannerReadFn readFn = new NaiveSpannerReadFn(read); - DoFnTester<Object, Struct> fnTester = DoFnTester.of(readFn); + NaiveSpannerReadFn readFn = new NaiveSpannerReadFn(read.getSpannerConfig()); + DoFnTester<ReadOperation, Struct> fnTester = DoFnTester.of(readFn); when(serviceFactory.mockDatabaseClient().readOnlyTransaction(any(TimestampBound.class))) .thenReturn(mockTx); when(mockTx.readUsingIndex("users", "theindex", KeySet.all(), Arrays.asList("id", "name"))) - .thenReturn(ResultSets.forRows(fakeType, fakeRows)); + .thenReturn(ResultSets.forRows(FAKE_TYPE, FAKE_ROWS)); - List<Struct> result = fnTester.processBundle(1); - assertThat(result, Matchers.<Struct>iterableWithSize(2)); + List<Struct> result = fnTester.processBundle(read.getReadOperation()); + assertThat(result, Matchers.containsInAnyOrder(FAKE_ROWS.toArray())); verify(serviceFactory.mockDatabaseClient()).readOnlyTransaction(TimestampBound.strong()); verify(mockTx).readUsingIndex("users", "theindex", KeySet.all(), Arrays.asList("id", "name")); @@ -233,30 +237,32 @@ public class SpannerIOReadTest implements Serializable { public void readPipeline() throws Exception { Timestamp timestamp = Timestamp.ofTimeMicroseconds(12345); - PCollectionView<Transaction> tx = pipeline - .apply("tx", SpannerIO.createTransaction() + SpannerConfig spannerConfig = + SpannerConfig.create() .withProjectId("test") .withInstanceId("123") .withDatabaseId("aaa") - .withServiceFactory(serviceFactory)); - - PCollection<Struct> one = pipeline.apply("read q", SpannerIO.read() - .withProjectId("test") - .withInstanceId("123") - .withDatabaseId("aaa") - .withTimestamp(Timestamp.now()) - .withQuery("SELECT * FROM users") - .withServiceFactory(serviceFactory) - .withTransaction(tx)); - PCollection<Struct> two = pipeline.apply("read r", SpannerIO.read() - .withProjectId("test") - .withInstanceId("123") - .withDatabaseId("aaa") - .withTimestamp(Timestamp.now()) - .withTable("users") - .withColumns("id", "name") - .withServiceFactory(serviceFactory) - .withTransaction(tx)); + .withServiceFactory(serviceFactory); + + PCollectionView<Transaction> tx = + pipeline.apply("tx", SpannerIO.createTransaction().withSpannerConfig(spannerConfig)); + + PCollection<Struct> one = + pipeline.apply( + "read q", + SpannerIO.read() + .withSpannerConfig(spannerConfig) + .withQuery("SELECT * FROM users") + .withTransaction(tx)); + PCollection<Struct> two = + pipeline.apply( + "read r", + SpannerIO.read() + .withSpannerConfig(spannerConfig) + .withTimestamp(Timestamp.now()) + .withTable("users") + .withColumns("id", "name") + .withTransaction(tx)); when(serviceFactory.mockDatabaseClient().readOnlyTransaction(any(TimestampBound.class))) .thenReturn(mockTx); @@ -265,13 +271,58 @@ public class SpannerIOReadTest implements Serializable { Collections.<Struct>emptyList())); when(mockTx.executeQuery(Statement.of("SELECT * FROM users"))) - .thenReturn(ResultSets.forRows(fakeType, fakeRows)); + .thenReturn(ResultSets.forRows(FAKE_TYPE, FAKE_ROWS)); + when(mockTx.read("users", KeySet.all(), Arrays.asList("id", "name"))) + .thenReturn(ResultSets.forRows(FAKE_TYPE, FAKE_ROWS)); + when(mockTx.getReadTimestamp()).thenReturn(timestamp); + + PAssert.that(one).containsInAnyOrder(FAKE_ROWS); + PAssert.that(two).containsInAnyOrder(FAKE_ROWS); + + pipeline.run(); + + verify(serviceFactory.mockDatabaseClient(), times(2)) + .readOnlyTransaction(TimestampBound.ofReadTimestamp(timestamp)); + } + + @Test + @Category(NeedsRunner.class) + public void readAllPipeline() throws Exception { + Timestamp timestamp = Timestamp.ofTimeMicroseconds(12345); + + SpannerConfig spannerConfig = + SpannerConfig.create() + .withProjectId("test") + .withInstanceId("123") + .withDatabaseId("aaa") + .withServiceFactory(serviceFactory); + + PCollectionView<Transaction> tx = + pipeline.apply("tx", SpannerIO.createTransaction().withSpannerConfig(spannerConfig)); + + PCollection<ReadOperation> reads = + pipeline.apply( + Create.of( + ReadOperation.create().withQuery("SELECT * FROM users"), + ReadOperation.create().withTable("users").withColumns("id", "name"))); + + PCollection<Struct> one = + reads.apply( + "read all", SpannerIO.readAll().withSpannerConfig(spannerConfig).withTransaction(tx)); + + when(serviceFactory.mockDatabaseClient().readOnlyTransaction(any(TimestampBound.class))) + .thenReturn(mockTx); + + when(mockTx.executeQuery(Statement.of("SELECT 1"))) + .thenReturn(ResultSets.forRows(Type.struct(), Collections.<Struct>emptyList())); + + when(mockTx.executeQuery(Statement.of("SELECT * FROM users"))) + .thenReturn(ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(0, 2))); when(mockTx.read("users", KeySet.all(), Arrays.asList("id", "name"))) - .thenReturn(ResultSets.forRows(fakeType, fakeRows)); + .thenReturn(ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(2, 4))); when(mockTx.getReadTimestamp()).thenReturn(timestamp); - PAssert.that(one).containsInAnyOrder(fakeRows); - PAssert.that(two).containsInAnyOrder(fakeRows); + PAssert.that(one).containsInAnyOrder(FAKE_ROWS); pipeline.run();
