Repository: beam Updated Branches: refs/heads/master 608a9c459 -> 10e47646d
Support ValueProviders in SpannerIO.Write Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/42a2de91 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/42a2de91 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/42a2de91 Branch: refs/heads/master Commit: 42a2de91adf1387bb8eaf9aa515a24f6f276bf40 Parents: 608a9c4 Author: Mairbek Khadikov <mair...@google.com> Authored: Wed Jun 14 13:03:36 2017 -0700 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Tue Jun 20 14:25:51 2017 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/spanner/SpannerIO.java | 31 ++++++++++++++------ .../beam/sdk/io/gcp/spanner/SpannerIOTest.java | 21 +++++++++++++ 2 files changed, 43 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/42a2de91/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java index af5253b..8bfc247 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -37,6 +37,7 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -123,13 +124,13 @@ public class SpannerIO { public abstract static class Write extends PTransform<PCollection<Mutation>, PDone> { @Nullable - abstract String getProjectId(); + abstract ValueProvider<String> getProjectId(); @Nullable - abstract String getInstanceId(); + abstract ValueProvider<String> getInstanceId(); @Nullable - abstract String getDatabaseId(); + abstract ValueProvider<String> getDatabaseId(); abstract long getBatchSizeBytes(); @@ -142,11 +143,11 @@ public class SpannerIO { @AutoValue.Builder abstract static class Builder { - abstract Builder setProjectId(String projectId); + abstract Builder setProjectId(ValueProvider<String> projectId); - abstract Builder setInstanceId(String instanceId); + abstract Builder setInstanceId(ValueProvider<String> instanceId); - abstract Builder setDatabaseId(String databaseId); + abstract Builder setDatabaseId(ValueProvider<String> databaseId); abstract Builder setBatchSizeBytes(long batchSizeBytes); @@ -162,6 +163,10 @@ public class SpannerIO { * <p>Does not modify this object. */ public Write withProjectId(String projectId) { + return withProjectId(ValueProvider.StaticValueProvider.of(projectId)); + } + + public Write withProjectId(ValueProvider<String> projectId) { return toBuilder().setProjectId(projectId).build(); } @@ -172,6 +177,10 @@ public class SpannerIO { * <p>Does not modify this object. */ public Write withInstanceId(String instanceId) { + return withInstanceId(ValueProvider.StaticValueProvider.of(instanceId)); + } + + public Write withInstanceId(ValueProvider<String> instanceId) { return toBuilder().setInstanceId(instanceId).build(); } @@ -191,6 +200,10 @@ public class SpannerIO { * <p>Does not modify this object. */ public Write withDatabaseId(String databaseId) { + return withDatabaseId(ValueProvider.StaticValueProvider.of(databaseId)); + } + + public Write withDatabaseId(ValueProvider<String> databaseId) { return toBuilder().setDatabaseId(databaseId).build(); } @@ -291,7 +304,7 @@ public class SpannerIO { SpannerOptions spannerOptions = getSpannerOptions(); spanner = spannerOptions.getService(); dbClient = spanner.getDatabaseClient( - DatabaseId.of(projectId(), spec.getInstanceId(), spec.getDatabaseId())); + DatabaseId.of(projectId(), spec.getInstanceId().get(), spec.getDatabaseId().get())); mutations = new ArrayList<>(); batchSizeBytes = 0; } @@ -309,7 +322,7 @@ public class SpannerIO { private String projectId() { return spec.getProjectId() == null ? ServiceOptions.getDefaultProjectId() - : spec.getProjectId(); + : spec.getProjectId().get(); } @FinishBundle @@ -334,7 +347,7 @@ public class SpannerIO { spannerOptionsBuider.setServiceFactory(spec.getServiceFactory()); } if (spec.getProjectId() != null) { - spannerOptionsBuider.setProjectId(spec.getProjectId()); + spannerOptionsBuider.setProjectId(spec.getProjectId().get()); } return spannerOptionsBuider.build(); } http://git-wip-us.apache.org/repos/asf/beam/blob/42a2de91/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java index 4a759fb..1e19a59 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java @@ -17,6 +17,9 @@ */ package org.apache.beam.sdk.io.gcp.spanner; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.hamcrest.Matchers.hasSize; +import static org.junit.Assert.assertThat; import static org.mockito.Mockito.argThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -42,6 +45,7 @@ import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFnTester; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.PCollection; import org.junit.Before; import org.junit.Rule; @@ -231,6 +235,23 @@ public class SpannerIOTest implements Serializable { .writeAtLeastOnce(argThat(new IterableOfSize(3))); } + @Test + public void displayData() throws Exception { + SpannerIO.Write write = + SpannerIO.write() + .withProjectId("test-project") + .withInstanceId("test-instance") + .withDatabaseId("test-database") + .withBatchSizeBytes(123); + + DisplayData data = DisplayData.from(write); + assertThat(data.items(), hasSize(4)); + assertThat(data, hasDisplayItem("projectId", "test-project")); + assertThat(data, hasDisplayItem("instanceId", "test-instance")); + assertThat(data, hasDisplayItem("databaseId", "test-database")); + assertThat(data, hasDisplayItem("batchSizeBytes", 123)); + } + private static class FakeServiceFactory implements ServiceFactory<Spanner, SpannerOptions>, Serializable { // Marked as static so they could be returned by serviceFactory, which is serializable.