Repository: beam Updated Branches: refs/heads/master 8b0449450 -> 8ee3572b4
Add localhost option for DatastoreIO Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/da8d0dd8 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/da8d0dd8 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/da8d0dd8 Branch: refs/heads/master Commit: da8d0dd800bc0508b1f5b728211147de8ad9d086 Parents: 8b04494 Author: Vikas Kedigehalli <[email protected]> Authored: Fri Oct 28 15:31:38 2016 -0700 Committer: Dan Halperin <[email protected]> Committed: Fri Feb 3 02:05:22 2017 -0800 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/datastore/DatastoreV1.java | 147 ++++++++++++------- .../sdk/io/gcp/datastore/DatastoreV1Test.java | 15 +- .../sdk/io/gcp/datastore/SplitQueryFnIT.java | 2 +- 3 files changed, 107 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/da8d0dd8/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java index 1e8271c..4a219aa 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java @@ -93,7 +93,8 @@ import org.slf4j.LoggerFactory; /** * {@link DatastoreV1} provides an API to Read, Write and Delete {@link PCollection PCollections} * of <a href="https://developers.google.com/datastore/">Google Cloud Datastore</a> version v1 - * {@link Entity} objects. + * {@link Entity} objects. Read is only supported for Bounded PCollections while Write and Delete + * are supported for both Bounded and Unbounded PCollections. * * <p>This API currently requires an authentication workaround. To use {@link DatastoreV1}, users * must use the {@code gcloud} command line tool to get credentials for Cloud Datastore: @@ -124,8 +125,10 @@ import org.slf4j.LoggerFactory; * * <p><b>Note:</b> Normally, a Cloud Dataflow job will read from Cloud Datastore in parallel across * many workers. However, when the {@link Query} is configured with a limit using - * {@link com.google.datastore.v1.Query.Builder#setLimit(Int32Value)}, then - * all returned results will be read by a single Dataflow worker in order to ensure correct data. + * {@link com.google.datastore.v1.Query.Builder#setLimit(Int32Value)} or if the Query contains + * inequality filters like {@code GREATER_THAN, LESS_THAN} etc., then all returned results + * will be read by a single worker in order to ensure correct data. Since data is read from + * a single worker, this could have a significant impact on the performance of the job. * * <p>To write a {@link PCollection} to a Cloud Datastore, use {@link DatastoreV1#write}, * specifying the Cloud Datastore project to write to: @@ -176,6 +179,10 @@ import org.slf4j.LoggerFactory; * <p>Please see <a href="https://cloud.google.com/datastore/docs/activate">Cloud Datastore Sign Up * </a>for security and permission related information specific to Cloud Datastore. * + * <p>Optionally, Cloud Datastore V1 Emulator, running locally, could be used for testing purposes + * by providing the host port information through {@code withLocalhost("host:port"} for all the + * above transforms. In such a case, all the Cloud Datastore API calls are directed to the Emulator. + * * @see org.apache.beam.sdk.runners.PipelineRunner */ @Experimental(Experimental.Kind.SOURCE_SINK) @@ -231,6 +238,7 @@ public class DatastoreV1 { @Nullable public abstract Query getQuery(); @Nullable public abstract String getNamespace(); public abstract int getNumQuerySplits(); + @Nullable public abstract String getLocalhost(); @Override public abstract String toString(); @@ -243,6 +251,7 @@ public class DatastoreV1 { abstract Builder setQuery(Query query); abstract Builder setNamespace(String namespace); abstract Builder setNumQuerySplits(int numQuerySplits); + abstract Builder setLocalhost(String localhost); abstract Read build(); } @@ -410,10 +419,20 @@ public class DatastoreV1 { .build(); } + /** + * Returns a new {@link DatastoreV1.Read} that reads from a Datastore Emulator running at the + * given localhost address. + */ + public DatastoreV1.Read withLocalhost(String localhost) { + return toBuilder() + .setLocalhost(localhost) + .build(); + } + @Override public PCollection<Entity> expand(PBegin input) { V1Options v1Options = V1Options.from(getProjectId(), getQuery(), - getNamespace()); + getNamespace(), getLocalhost()); /* * This composite transform involves the following steps: @@ -469,34 +488,17 @@ public class DatastoreV1 { * A class for v1 Cloud Datastore related options. */ @VisibleForTesting - static class V1Options implements Serializable { - private final Query query; - private final String projectId; - @Nullable - private final String namespace; - - private V1Options(String projectId, Query query, @Nullable String namespace) { - this.projectId = checkNotNull(projectId, "projectId"); - this.query = checkNotNull(query, "query"); - this.namespace = namespace; + @AutoValue + abstract static class V1Options implements Serializable { + public static V1Options from(String projectId, Query query, @Nullable String namespace, + @Nullable String localhost) { + return new AutoValue_DatastoreV1_Read_V1Options(projectId, query, namespace, localhost); } - public static V1Options from(String projectId, Query query, @Nullable String namespace) { - return new V1Options(projectId, query, namespace); - } - - public Query getQuery() { - return query; - } - - public String getProjectId() { - return projectId; - } - - @Nullable - public String getNamespace() { - return namespace; - } + public abstract String getProjectId(); + public abstract Query getQuery(); + @Nullable public abstract String getNamespace(); + @Nullable public abstract String getLocalhost(); } /** @@ -529,7 +531,8 @@ public class DatastoreV1 { @StartBundle public void startBundle(Context c) throws Exception { - datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), options.projectId); + datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), options.getProjectId(), + options.getLocalhost()); querySplitter = datastoreFactory.getQuerySplitter(); } @@ -603,7 +606,8 @@ public class DatastoreV1 { @StartBundle public void startBundle(Context c) throws Exception { - datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), options.getProjectId()); + datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), options.getProjectId(), + options.getLocalhost()); } /** Read and output entities for the given query. */ @@ -664,7 +668,7 @@ public class DatastoreV1 { * {@code projectId} using {@link DatastoreV1.Write#withProjectId}. */ public Write write() { - return new Write(null); + return new Write(null, null); } /** @@ -672,7 +676,7 @@ public class DatastoreV1 { * {@code projectId} using {@link DeleteEntity#withProjectId}. */ public DeleteEntity deleteEntity() { - return new DeleteEntity(null); + return new DeleteEntity(null, null); } /** @@ -680,7 +684,7 @@ public class DatastoreV1 { * {@code projectId} using {@link DeleteKey#withProjectId}. */ public DeleteKey deleteKey() { - return new DeleteKey(null); + return new DeleteKey(null, null); } /** @@ -693,8 +697,8 @@ public class DatastoreV1 { * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if * it is {@code null} at instantiation time, an error will be thrown. */ - Write(@Nullable String projectId) { - super(projectId, new UpsertFn()); + Write(@Nullable String projectId, @Nullable String localhost) { + super(projectId, localhost, new UpsertFn()); } /** @@ -702,7 +706,16 @@ public class DatastoreV1 { */ public Write withProjectId(String projectId) { checkNotNull(projectId, "projectId"); - return new Write(projectId); + return new Write(projectId, null); + } + + /** + * Returns a new {@link Write} that writes to the Cloud Datastore Emulator running locally on + * the specified host port. + */ + public Write withLocalhost(String localhost) { + checkNotNull(localhost, "localhost"); + return new Write(null, localhost); } } @@ -716,8 +729,8 @@ public class DatastoreV1 { * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if * it is {@code null} at instantiation time, an error will be thrown. */ - DeleteEntity(@Nullable String projectId) { - super(projectId, new DeleteEntityFn()); + DeleteEntity(@Nullable String projectId, @Nullable String localhost) { + super(projectId, localhost, new DeleteEntityFn()); } /** @@ -726,7 +739,16 @@ public class DatastoreV1 { */ public DeleteEntity withProjectId(String projectId) { checkNotNull(projectId, "projectId"); - return new DeleteEntity(projectId); + return new DeleteEntity(projectId, null); + } + + /** + * Returns a new {@link DeleteEntity} that deletes entities from the Cloud Datastore Emulator + * running locally on the specified host port. + */ + public DeleteEntity withLocalhost(String localhost) { + checkNotNull(localhost, "localhost"); + return new DeleteEntity(null, localhost); } } @@ -741,8 +763,8 @@ public class DatastoreV1 { * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if * it is {@code null} at instantiation time, an error will be thrown. */ - DeleteKey(@Nullable String projectId) { - super(projectId, new DeleteKeyFn()); + DeleteKey(@Nullable String projectId, @Nullable String localhost) { + super(projectId, localhost, new DeleteKeyFn()); } /** @@ -751,7 +773,16 @@ public class DatastoreV1 { */ public DeleteKey withProjectId(String projectId) { checkNotNull(projectId, "projectId"); - return new DeleteKey(projectId); + return new DeleteKey(projectId, null); + } + + /** + * Returns a new {@link DeleteKey} that deletes entities from the Cloud Datastore Emulator + * running locally on the specified host port. + */ + public DeleteKey withLocalhost(String localhost) { + checkNotNull(localhost, "localhost"); + return new DeleteKey(null, localhost); } } @@ -766,6 +797,8 @@ public class DatastoreV1 { private abstract static class Mutate<T> extends PTransform<PCollection<T>, PDone> { @Nullable private final String projectId; + @Nullable + private final String localhost; /** A function that transforms each {@code T} into a mutation. */ private final SimpleFunction<T, Mutation> mutationFn; @@ -773,15 +806,18 @@ public class DatastoreV1 { * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if * it is {@code null} at instantiation time, an error will be thrown. */ - Mutate(@Nullable String projectId, SimpleFunction<T, Mutation> mutationFn) { + Mutate(@Nullable String projectId, @Nullable String localhost, + SimpleFunction<T, Mutation> mutationFn) { this.projectId = projectId; + this.localhost = localhost; this.mutationFn = checkNotNull(mutationFn); } @Override public PDone expand(PCollection<T> input) { input.apply("Convert to Mutation", MapElements.via(mutationFn)) - .apply("Write Mutation to Datastore", ParDo.of(new DatastoreWriterFn(projectId))); + .apply("Write Mutation to Datastore", ParDo.of( + new DatastoreWriterFn(projectId, localhost))); return PDone.in(input.getPipeline()); } @@ -832,6 +868,8 @@ public class DatastoreV1 { static class DatastoreWriterFn extends DoFn<Mutation, Void> { private static final Logger LOG = LoggerFactory.getLogger(DatastoreWriterFn.class); private final String projectId; + @Nullable + private final String localhost; private transient Datastore datastore; private final V1DatastoreFactory datastoreFactory; // Current batch of mutations to be written. @@ -842,19 +880,21 @@ public class DatastoreV1 { FluentBackoff.DEFAULT .withMaxRetries(MAX_RETRIES).withInitialBackoff(Duration.standardSeconds(5)); - DatastoreWriterFn(String projectId) { - this(projectId, new V1DatastoreFactory()); + DatastoreWriterFn(String projectId, @Nullable String localhost) { + this(projectId, localhost, new V1DatastoreFactory()); } @VisibleForTesting - DatastoreWriterFn(String projectId, V1DatastoreFactory datastoreFactory) { + DatastoreWriterFn(String projectId, @Nullable String localhost, + V1DatastoreFactory datastoreFactory) { this.projectId = checkNotNull(projectId, "projectId"); + this.localhost = localhost; this.datastoreFactory = datastoreFactory; } @StartBundle public void startBundle(Context c) { - datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), projectId); + datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), projectId, localhost); } @ProcessElement @@ -1008,7 +1048,8 @@ public class DatastoreV1 { static class V1DatastoreFactory implements Serializable { /** Builds a Cloud Datastore client for the given pipeline options and project. */ - public Datastore getDatastore(PipelineOptions pipelineOptions, String projectId) { + public Datastore getDatastore(PipelineOptions pipelineOptions, String projectId, + @Nullable String localhost) { Credentials credential = pipelineOptions.as(GcpOptions.class).getGcpCredential(); HttpRequestInitializer initializer; if (credential != null) { @@ -1024,6 +1065,10 @@ public class DatastoreV1 { .projectId(projectId) .initializer(initializer); + if (localhost != null) { + builder.localHost(localhost); + } + return DatastoreFactory.get().create(builder.build()); } http://git-wip-us.apache.org/repos/asf/beam/blob/da8d0dd8/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java index dd1904a..c2bc8d2 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java @@ -110,12 +110,13 @@ public class DatastoreV1Test { private static final String NAMESPACE = "testNamespace"; private static final String KIND = "testKind"; private static final Query QUERY; + private static final String LOCALHOST = "localhost:9955"; private static final V1Options V_1_OPTIONS; static { Query.Builder q = Query.newBuilder(); q.addKindBuilder().setName(KIND); QUERY = q.build(); - V_1_OPTIONS = V1Options.from(PROJECT_ID, QUERY, NAMESPACE); + V_1_OPTIONS = V1Options.from(PROJECT_ID, QUERY, NAMESPACE, null); } private DatastoreV1.Read initialRead; @@ -136,7 +137,8 @@ public class DatastoreV1Test { initialRead = DatastoreIO.v1().read() .withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE); - when(mockDatastoreFactory.getDatastore(any(PipelineOptions.class), any(String.class))) + when(mockDatastoreFactory.getDatastore(any(PipelineOptions.class), any(String.class), + any(String.class))) .thenReturn(mockDatastore); when(mockDatastoreFactory.getQuerySplitter()) .thenReturn(mockQuerySplitter); @@ -157,10 +159,12 @@ public class DatastoreV1Test { @Test public void testBuildReadAlt() throws Exception { DatastoreV1.Read read = DatastoreIO.v1().read() - .withProjectId(PROJECT_ID).withNamespace(NAMESPACE).withQuery(QUERY); + .withProjectId(PROJECT_ID).withNamespace(NAMESPACE).withQuery(QUERY) + .withLocalhost(LOCALHOST); assertEquals(QUERY, read.getQuery()); assertEquals(PROJECT_ID, read.getProjectId()); assertEquals(NAMESPACE, read.getNamespace()); + assertEquals(LOCALHOST, read.getLocalhost()); } @Test @@ -504,7 +508,7 @@ public class DatastoreV1Test { @Test public void testDatastoreWriteFnDisplayData() { - DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(PROJECT_ID); + DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(PROJECT_ID, null); DisplayData displayData = DisplayData.from(datastoreWriter); assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID)); } @@ -539,7 +543,8 @@ public class DatastoreV1Test { makeUpsert(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)).build()).build()); } - DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(PROJECT_ID, mockDatastoreFactory); + DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(PROJECT_ID, null, + mockDatastoreFactory); DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter); doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); doFnTester.processBundle(mutations); http://git-wip-us.apache.org/repos/asf/beam/blob/da8d0dd8/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java index 4dd1608..49a60c6 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java @@ -86,7 +86,7 @@ public class SplitQueryFnIT { query.addKindBuilder().setName(kind); SplitQueryFn splitQueryFn = new SplitQueryFn( - V1Options.from(projectId, query.build(), namespace), 0); + V1Options.from(projectId, query.build(), namespace, null), 0); DoFnTester<Query, KV<Integer, Query>> doFnTester = DoFnTester.of(splitQueryFn); List<KV<Integer, Query>> queries = doFnTester.processBundle(query.build());
