http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java index 8aac417..91caded 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java @@ -52,7 +52,6 @@ public class GcpApiSurfaceTest { @SuppressWarnings("unchecked") final Set<Matcher<Class<?>>> allowedClasses = ImmutableSet.of( - classesInPackage("com.google.api.core"), classesInPackage("com.google.api.client.googleapis"), classesInPackage("com.google.api.client.http"), classesInPackage("com.google.api.client.json"), @@ -61,18 +60,9 @@ public class GcpApiSurfaceTest { classesInPackage("com.google.auth"), classesInPackage("com.google.bigtable.v2"), classesInPackage("com.google.cloud.bigtable.config"), - classesInPackage("com.google.spanner.v1"), - Matchers.<Class<?>>equalTo(com.google.api.gax.grpc.ApiException.class), Matchers.<Class<?>>equalTo(com.google.cloud.bigtable.grpc.BigtableClusterName.class), Matchers.<Class<?>>equalTo(com.google.cloud.bigtable.grpc.BigtableInstanceName.class), Matchers.<Class<?>>equalTo(com.google.cloud.bigtable.grpc.BigtableTableName.class), - Matchers.<Class<?>>equalTo(com.google.cloud.BaseServiceException.class), - Matchers.<Class<?>>equalTo(com.google.cloud.BaseServiceException.Error.class), - Matchers.<Class<?>>equalTo(com.google.cloud.BaseServiceException.ExceptionData.class), - Matchers.<Class<?>>equalTo(com.google.cloud.BaseServiceException.ExceptionData.Builder - .class), - Matchers.<Class<?>>equalTo(com.google.cloud.RetryHelper.RetryHelperException.class), - Matchers.<Class<?>>equalTo(com.google.cloud.grpc.BaseGrpcServiceException.class), Matchers.<Class<?>>equalTo(com.google.cloud.ByteArray.class), Matchers.<Class<?>>equalTo(com.google.cloud.Date.class), Matchers.<Class<?>>equalTo(com.google.cloud.Timestamp.class),
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index d31f3a0..bfd260a 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -82,7 +82,6 @@ import org.apache.beam.sdk.coders.Coder.Context; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.ShardedKeyCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.io.BoundedSource; @@ -132,7 +131,6 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PCollectionViews; -import org.apache.beam.sdk.values.ShardedKey; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.ValueInSingleWindow; http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java index 91f0bae..a064bd6 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java @@ -20,7 +20,6 @@ package org.apache.beam.sdk.io.gcp.bigtable; import com.google.bigtable.v2.Row; import com.google.cloud.bigtable.config.BigtableOptions; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -42,10 +41,8 @@ public class BigtableReadIT { BigtableTestOptions options = TestPipeline.testingPipelineOptions() .as(BigtableTestOptions.class); - String project = options.as(GcpOptions.class).getProject(); - BigtableOptions.Builder bigtableOptionsBuilder = new BigtableOptions.Builder() - .setProjectId(project) + .setProjectId(options.getProjectId()) .setInstanceId(options.getInstanceId()); final String tableId = "BigtableReadTest"; http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableTestOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableTestOptions.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableTestOptions.java index 03cb697..0ab7576 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableTestOptions.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableTestOptions.java @@ -25,6 +25,11 @@ import org.apache.beam.sdk.testing.TestPipelineOptions; * Properties needed when using Bigtable with the Beam SDK. */ public interface BigtableTestOptions extends TestPipelineOptions { + @Description("Project ID for Bigtable") + @Default.String("apache-beam-testing") + String getProjectId(); + void setProjectId(String value); + @Description("Instance ID for Bigtable") @Default.String("beam-test") String getInstanceId(); http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java index 010bcc4..1d168f1 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java @@ -73,17 +73,15 @@ public class BigtableWriteIT implements Serializable { private static BigtableTableAdminClient tableAdminClient; private final String tableId = String.format("BigtableWriteIT-%tF-%<tH-%<tM-%<tS-%<tL", new Date()); - private String project; @Before public void setup() throws Exception { PipelineOptionsFactory.register(BigtableTestOptions.class); options = TestPipeline.testingPipelineOptions().as(BigtableTestOptions.class); - project = options.as(GcpOptions.class).getProject(); bigtableOptions = new Builder() - .setProjectId(project) + .setProjectId(options.getProjectId()) .setInstanceId(options.getInstanceId()) .setUserAgent("apache-beam-test") .build(); http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/AdaptiveThrottlerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/AdaptiveThrottlerTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/AdaptiveThrottlerTest.java deleted file mode 100644 index c12cf55..0000000 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/AdaptiveThrottlerTest.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.datastore; - -import static org.hamcrest.Matchers.closeTo; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; - -import java.util.Random; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mockito; - -/** - * Tests for {@link AdaptiveThrottler}. - */ -@RunWith(JUnit4.class) -public class AdaptiveThrottlerTest { - - static final long START_TIME_MS = 0; - static final long SAMPLE_PERIOD_MS = 60000; - static final long SAMPLE_BUCKET_MS = 1000; - static final double OVERLOAD_RATIO = 2; - - /** Returns a throttler configured with the standard parameters above. */ - AdaptiveThrottler getThrottler() { - return new AdaptiveThrottler(SAMPLE_PERIOD_MS, SAMPLE_BUCKET_MS, OVERLOAD_RATIO); - } - - @Test - public void testNoInitialThrottling() throws Exception { - AdaptiveThrottler throttler = getThrottler(); - assertThat(throttler.throttlingProbability(START_TIME_MS), equalTo(0.0)); - assertThat("first request is not throttled", - throttler.throttleRequest(START_TIME_MS), equalTo(false)); - } - - @Test - public void testNoThrottlingIfNoErrors() throws Exception { - AdaptiveThrottler throttler = getThrottler(); - long t = START_TIME_MS; - for (; t < START_TIME_MS + 20; t++) { - assertFalse(throttler.throttleRequest(t)); - throttler.successfulRequest(t); - } - assertThat(throttler.throttlingProbability(t), equalTo(0.0)); - } - - @Test - public void testNoThrottlingAfterErrorsExpire() throws Exception { - AdaptiveThrottler throttler = getThrottler(); - long t = START_TIME_MS; - for (; t < START_TIME_MS + SAMPLE_PERIOD_MS; t++) { - throttler.throttleRequest(t); - // and no successfulRequest. - } - assertThat("check that we set up a non-zero probability of throttling", - throttler.throttlingProbability(t), greaterThan(0.0)); - for (; t < START_TIME_MS + 2 * SAMPLE_PERIOD_MS; t++) { - throttler.throttleRequest(t); - throttler.successfulRequest(t); - } - assertThat(throttler.throttlingProbability(t), equalTo(0.0)); - } - - @Test - public void testThrottlingAfterErrors() throws Exception { - Random mockRandom = Mockito.mock(Random.class); - Mockito.when(mockRandom.nextDouble()).thenReturn( - 0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, - 0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9); - AdaptiveThrottler throttler = new AdaptiveThrottler( - SAMPLE_PERIOD_MS, SAMPLE_BUCKET_MS, OVERLOAD_RATIO, mockRandom); - for (int i = 0; i < 20; i++) { - boolean throttled = throttler.throttleRequest(START_TIME_MS + i); - // 1/3rd of requests succeeding. - if (i % 3 == 1) { - throttler.successfulRequest(START_TIME_MS + i); - } - - // Once we have some history in place, check what throttling happens. - if (i >= 10) { - // Expect 1/3rd of requests to be throttled. (So 1/3rd throttled, 1/3rd succeeding, 1/3rd - // tried and failing). - assertThat(String.format("for i=%d", i), - throttler.throttlingProbability(START_TIME_MS + i), closeTo(0.33, /*error=*/ 0.1)); - // Requests 10..13 should be throttled, 14..19 not throttled given the mocked random numbers - // that we fed to throttler. - assertThat(String.format("for i=%d", i), throttled, equalTo(i < 14)); - } - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java index a3f5d38..460049e 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java @@ -27,7 +27,7 @@ import static com.google.datastore.v1.client.DatastoreHelper.makeOrder; import static com.google.datastore.v1.client.DatastoreHelper.makeUpsert; import static com.google.datastore.v1.client.DatastoreHelper.makeValue; import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DATASTORE_BATCH_UPDATE_BYTES_LIMIT; -import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START; +import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DATASTORE_BATCH_UPDATE_LIMIT; import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.DEFAULT_BUNDLE_SIZE_BYTES; import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.QUERY_BATCH_LIMIT; import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.getEstimatedSizeBytes; @@ -606,7 +606,7 @@ public class DatastoreV1Test { /** Tests {@link DatastoreWriterFn} with entities of more than one batches, but not a multiple. */ @Test public void testDatatoreWriterFnWithMultipleBatches() throws Exception { - datastoreWriterFnTest(DATASTORE_BATCH_UPDATE_ENTITIES_START * 3 + 100); + datastoreWriterFnTest(DATASTORE_BATCH_UPDATE_LIMIT * 3 + 100); } /** @@ -615,7 +615,7 @@ public class DatastoreV1Test { */ @Test public void testDatatoreWriterFnWithBatchesExactMultiple() throws Exception { - datastoreWriterFnTest(DATASTORE_BATCH_UPDATE_ENTITIES_START * 2); + datastoreWriterFnTest(DATASTORE_BATCH_UPDATE_LIMIT * 2); } // A helper method to test DatastoreWriterFn for various batch sizes. @@ -628,14 +628,14 @@ public class DatastoreV1Test { } DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(StaticValueProvider.of(PROJECT_ID), - null, mockDatastoreFactory, new FakeWriteBatcher()); + null, mockDatastoreFactory); DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter); doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); doFnTester.processBundle(mutations); int start = 0; while (start < numMutations) { - int end = Math.min(numMutations, start + DATASTORE_BATCH_UPDATE_ENTITIES_START); + int end = Math.min(numMutations, start + DATASTORE_BATCH_UPDATE_LIMIT); CommitRequest.Builder commitRequest = CommitRequest.newBuilder(); commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL); commitRequest.addAllMutations(mutations.subList(start, end)); @@ -651,28 +651,26 @@ public class DatastoreV1Test { @Test public void testDatatoreWriterFnWithLargeEntities() throws Exception { List<Mutation> mutations = new ArrayList<>(); - int entitySize = 0; + int propertySize = 900_000; for (int i = 0; i < 12; ++i) { - Entity entity = Entity.newBuilder().setKey(makeKey("key" + i, i + 1)) - .putProperties("long", makeValue(new String(new char[900_000]) - ).setExcludeFromIndexes(true).build()) - .build(); - entitySize = entity.getSerializedSize(); // Take the size of any one entity. - mutations.add(makeUpsert(entity).build()); + Entity.Builder entity = Entity.newBuilder().setKey(makeKey("key" + i, i + 1)); + entity.putProperties("long", makeValue(new String(new char[propertySize]) + ).setExcludeFromIndexes(true).build()); + mutations.add(makeUpsert(entity.build()).build()); } DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(StaticValueProvider.of(PROJECT_ID), - null, mockDatastoreFactory, new FakeWriteBatcher()); + null, mockDatastoreFactory); DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter); doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); doFnTester.processBundle(mutations); // This test is over-specific currently; it requires that we split the 12 entity writes into 3 // requests, but we only need each CommitRequest to be less than 10MB in size. - int entitiesPerRpc = DATASTORE_BATCH_UPDATE_BYTES_LIMIT / entitySize; + int propertiesPerRpc = DATASTORE_BATCH_UPDATE_BYTES_LIMIT / propertySize; int start = 0; while (start < mutations.size()) { - int end = Math.min(mutations.size(), start + entitiesPerRpc); + int end = Math.min(mutations.size(), start + propertiesPerRpc); CommitRequest.Builder commitRequest = CommitRequest.newBuilder(); commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL); commitRequest.addAllMutations(mutations.subList(start, end)); @@ -783,7 +781,7 @@ public class DatastoreV1Test { */ @Test public void testSplitQueryFnWithQueryLimit() throws Exception { - Query queryWithLimit = QUERY.toBuilder() + Query queryWithLimit = QUERY.toBuilder().clone() .setLimit(Int32Value.newBuilder().setValue(1)) .build(); @@ -896,50 +894,6 @@ public class DatastoreV1Test { .apply(DatastoreIO.v1().write().withProjectId(options.getDatastoreProject())); } - @Test - public void testWriteBatcherWithoutData() { - DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl(); - writeBatcher.start(); - assertEquals(DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START, writeBatcher.nextBatchSize(0)); - } - - @Test - public void testWriteBatcherFastQueries() { - DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl(); - writeBatcher.start(); - writeBatcher.addRequestLatency(0, 1000, 200); - writeBatcher.addRequestLatency(0, 1000, 200); - assertEquals(DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_LIMIT, writeBatcher.nextBatchSize(0)); - } - - @Test - public void testWriteBatcherSlowQueries() { - DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl(); - writeBatcher.start(); - writeBatcher.addRequestLatency(0, 10000, 200); - writeBatcher.addRequestLatency(0, 10000, 200); - assertEquals(100, writeBatcher.nextBatchSize(0)); - } - - @Test - public void testWriteBatcherSizeNotBelowMinimum() { - DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl(); - writeBatcher.start(); - writeBatcher.addRequestLatency(0, 30000, 50); - writeBatcher.addRequestLatency(0, 30000, 50); - assertEquals(DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_MIN, writeBatcher.nextBatchSize(0)); - } - - @Test - public void testWriteBatcherSlidingWindow() { - DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl(); - writeBatcher.start(); - writeBatcher.addRequestLatency(0, 30000, 50); - writeBatcher.addRequestLatency(50000, 5000, 200); - writeBatcher.addRequestLatency(100000, 5000, 200); - assertEquals(200, writeBatcher.nextBatchSize(150000)); - } - /** Helper Methods */ /** A helper function that verifies if all the queries have unique keys. */ @@ -1079,24 +1033,8 @@ public class DatastoreV1Test { private List<Query> splitQuery(Query query, int numSplits) { List<Query> queries = new LinkedList<>(); for (int i = 0; i < numSplits; i++) { - queries.add(query.toBuilder().build()); + queries.add(query.toBuilder().clone().build()); } return queries; } - - /** - * A WriteBatcher for unit tests, which does no timing-based adjustments (so unit tests have - * consistent results). - */ - static class FakeWriteBatcher implements DatastoreV1.WriteBatcher { - @Override - public void start() {} - @Override - public void addRequestLatency(long timeSinceEpochMillis, long latencyMillis, int numMutations) { - } - @Override - public int nextBatchSize(long timeSinceEpochMillis) { - return DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START; - } - } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java index cd61229..5e618df 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java @@ -374,7 +374,7 @@ class V1TestUtil { // Read the next batch of query results. private Iterator<EntityResult> getIteratorAndMoveCursor() throws DatastoreException { - Query.Builder query = this.query.toBuilder(); + Query.Builder query = this.query.toBuilder().clone(); query.setLimit(Int32Value.newBuilder().setValue(QUERY_BATCH_LIMIT)); if (currentBatch != null && !currentBatch.getEndCursor().isEmpty()) { query.setStartCursor(currentBatch.getEndCursor()); http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/FakeServiceFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/FakeServiceFactory.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/FakeServiceFactory.java deleted file mode 100644 index 753d807..0000000 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/FakeServiceFactory.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.spanner; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import static org.mockito.Mockito.withSettings; - -import com.google.cloud.ServiceFactory; -import com.google.cloud.spanner.DatabaseClient; -import com.google.cloud.spanner.DatabaseId; -import com.google.cloud.spanner.Spanner; -import com.google.cloud.spanner.SpannerOptions; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import javax.annotation.concurrent.GuardedBy; -import org.mockito.Matchers; - -/** - * A serialization friendly type service factory that maintains a mock {@link Spanner} and - * {@link DatabaseClient}. - * */ -class FakeServiceFactory - implements ServiceFactory<Spanner, SpannerOptions>, Serializable { - - // Marked as static so they could be returned by serviceFactory, which is serializable. - private static final Object lock = new Object(); - - @GuardedBy("lock") - private static final List<Spanner> mockSpanners = new ArrayList<>(); - - @GuardedBy("lock") - private static final List<DatabaseClient> mockDatabaseClients = new ArrayList<>(); - - @GuardedBy("lock") - private static int count = 0; - - private final int index; - - public FakeServiceFactory() { - synchronized (lock) { - index = count++; - mockSpanners.add(mock(Spanner.class, withSettings().serializable())); - mockDatabaseClients.add(mock(DatabaseClient.class, withSettings().serializable())); - } - when(mockSpanner().getDatabaseClient(Matchers.any(DatabaseId.class))) - .thenReturn(mockDatabaseClient()); - } - - DatabaseClient mockDatabaseClient() { - synchronized (lock) { - return mockDatabaseClients.get(index); - } - } - - Spanner mockSpanner() { - synchronized (lock) { - return mockSpanners.get(index); - } - } - - @Override - public Spanner create(SpannerOptions serviceOptions) { - return mockSpanner(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimatorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimatorTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimatorTest.java index 013b83d..03eb28e 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimatorTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimatorTest.java @@ -135,16 +135,4 @@ public class MutationSizeEstimatorTest { assertThat(MutationSizeEstimator.sizeOf(timestampArray), is(24L)); assertThat(MutationSizeEstimator.sizeOf(dateArray), is(48L)); } - - @Test - public void group() throws Exception { - Mutation int64 = Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build(); - Mutation float64 = Mutation.newInsertOrUpdateBuilder("test").set("one").to(2.9).build(); - Mutation bool = Mutation.newInsertOrUpdateBuilder("test").set("one").to(false).build(); - - MutationGroup group = MutationGroup.create(int64, float64, bool); - - assertThat(MutationSizeEstimator.sizeOf(group), is(17L)); - } - } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/RandomUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/RandomUtils.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/RandomUtils.java deleted file mode 100644 index f479b4a..0000000 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/RandomUtils.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.spanner; - -import java.util.Random; - -/** - * Useful randomness related utilities. - */ -public class RandomUtils { - - private static final char[] ALPHANUMERIC = "1234567890abcdefghijklmnopqrstuvwxyz".toCharArray(); - - private RandomUtils() { - } - - public static String randomAlphaNumeric(int length) { - Random random = new Random(); - char[] result = new char[length]; - for (int i = 0; i < length; i++) { - result[i] = ALPHANUMERIC[random.nextInt(ALPHANUMERIC.length)]; - } - return new String(result); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java deleted file mode 100644 index 5ba2da0..0000000 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java +++ /dev/null @@ -1,281 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.spanner; - -import static org.junit.Assert.assertThat; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import com.google.cloud.Timestamp; -import com.google.cloud.spanner.KeySet; -import com.google.cloud.spanner.ReadOnlyTransaction; -import com.google.cloud.spanner.ResultSets; -import com.google.cloud.spanner.Statement; -import com.google.cloud.spanner.Struct; -import com.google.cloud.spanner.TimestampBound; -import com.google.cloud.spanner.Type; -import com.google.cloud.spanner.Value; -import java.io.Serializable; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import org.apache.beam.sdk.testing.NeedsRunner; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.DoFnTester; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionView; -import org.hamcrest.Matchers; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mockito; - -/** Unit tests for {@link SpannerIO}. */ -@RunWith(JUnit4.class) -public class SpannerIOReadTest implements Serializable { - @Rule - public final transient TestPipeline pipeline = TestPipeline.create(); - @Rule - public final transient ExpectedException thrown = ExpectedException.none(); - - private FakeServiceFactory serviceFactory; - private ReadOnlyTransaction mockTx; - - private Type fakeType = Type.struct(Type.StructField.of("id", Type.int64()), - Type.StructField.of("name", Type.string())); - - private List<Struct> fakeRows = Arrays.asList( - Struct.newBuilder().add("id", Value.int64(1)).add("name", Value.string("Alice")).build(), - Struct.newBuilder().add("id", Value.int64(2)).add("name", Value.string("Bob")).build()); - - @Before - @SuppressWarnings("unchecked") - public void setUp() throws Exception { - serviceFactory = new FakeServiceFactory(); - mockTx = Mockito.mock(ReadOnlyTransaction.class); - } - - @Test - public void emptyTransform() throws Exception { - SpannerIO.Read read = SpannerIO.read(); - thrown.expect(NullPointerException.class); - thrown.expectMessage("requires instance id to be set with"); - read.validate(null); - } - - @Test - public void emptyInstanceId() throws Exception { - SpannerIO.Read read = SpannerIO.read().withDatabaseId("123"); - thrown.expect(NullPointerException.class); - thrown.expectMessage("requires instance id to be set with"); - read.validate(null); - } - - @Test - public void emptyDatabaseId() throws Exception { - SpannerIO.Read read = SpannerIO.read().withInstanceId("123"); - thrown.expect(NullPointerException.class); - thrown.expectMessage("requires database id to be set with"); - read.validate(null); - } - - @Test - public void emptyQuery() throws Exception { - SpannerIO.Read read = - SpannerIO.read().withInstanceId("123").withDatabaseId("aaa").withTimestamp(Timestamp.now()); - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("requires configuring query or read operation"); - read.validate(null); - } - - @Test - public void emptyColumns() throws Exception { - SpannerIO.Read read = - SpannerIO.read() - .withInstanceId("123") - .withDatabaseId("aaa") - .withTimestamp(Timestamp.now()) - .withTable("users"); - thrown.expect(NullPointerException.class); - thrown.expectMessage("requires a list of columns"); - read.validate(null); - } - - @Test - public void validRead() throws Exception { - SpannerIO.Read read = - SpannerIO.read() - .withInstanceId("123") - .withDatabaseId("aaa") - .withTimestamp(Timestamp.now()) - .withTable("users") - .withColumns("id", "name", "email"); - read.validate(null); - } - - @Test - public void validQuery() throws Exception { - SpannerIO.Read read = - SpannerIO.read() - .withInstanceId("123") - .withDatabaseId("aaa") - .withTimestamp(Timestamp.now()) - .withQuery("SELECT * FROM users"); - read.validate(null); - } - - @Test - public void runQuery() throws Exception { - SpannerIO.Read read = - SpannerIO.read() - .withProjectId("test") - .withInstanceId("123") - .withDatabaseId("aaa") - .withTimestamp(Timestamp.now()) - .withQuery("SELECT * FROM users") - .withServiceFactory(serviceFactory); - - NaiveSpannerReadFn readFn = new NaiveSpannerReadFn(read); - DoFnTester<Object, Struct> fnTester = DoFnTester.of(readFn); - - when(serviceFactory.mockDatabaseClient().readOnlyTransaction(any(TimestampBound.class))) - .thenReturn(mockTx); - when(mockTx.executeQuery(any(Statement.class))) - .thenReturn(ResultSets.forRows(fakeType, fakeRows)); - - List<Struct> result = fnTester.processBundle(1); - assertThat(result, Matchers.<Struct>iterableWithSize(2)); - - verify(serviceFactory.mockDatabaseClient()).readOnlyTransaction(TimestampBound - .strong()); - verify(mockTx).executeQuery(Statement.of("SELECT * FROM users")); - } - - @Test - public void runRead() throws Exception { - SpannerIO.Read read = - SpannerIO.read() - .withProjectId("test") - .withInstanceId("123") - .withDatabaseId("aaa") - .withTimestamp(Timestamp.now()) - .withTable("users") - .withColumns("id", "name") - .withServiceFactory(serviceFactory); - - NaiveSpannerReadFn readFn = new NaiveSpannerReadFn(read); - DoFnTester<Object, Struct> fnTester = DoFnTester.of(readFn); - - when(serviceFactory.mockDatabaseClient().readOnlyTransaction(any(TimestampBound.class))) - .thenReturn(mockTx); - when(mockTx.read("users", KeySet.all(), Arrays.asList("id", "name"))) - .thenReturn(ResultSets.forRows(fakeType, fakeRows)); - - List<Struct> result = fnTester.processBundle(1); - assertThat(result, Matchers.<Struct>iterableWithSize(2)); - - verify(serviceFactory.mockDatabaseClient()).readOnlyTransaction(TimestampBound.strong()); - verify(mockTx).read("users", KeySet.all(), Arrays.asList("id", "name")); - } - - @Test - public void runReadUsingIndex() throws Exception { - SpannerIO.Read read = - SpannerIO.read() - .withProjectId("test") - .withInstanceId("123") - .withDatabaseId("aaa") - .withTimestamp(Timestamp.now()) - .withTable("users") - .withColumns("id", "name") - .withIndex("theindex") - .withServiceFactory(serviceFactory); - - NaiveSpannerReadFn readFn = new NaiveSpannerReadFn(read); - DoFnTester<Object, Struct> fnTester = DoFnTester.of(readFn); - - when(serviceFactory.mockDatabaseClient().readOnlyTransaction(any(TimestampBound.class))) - .thenReturn(mockTx); - when(mockTx.readUsingIndex("users", "theindex", KeySet.all(), Arrays.asList("id", "name"))) - .thenReturn(ResultSets.forRows(fakeType, fakeRows)); - - List<Struct> result = fnTester.processBundle(1); - assertThat(result, Matchers.<Struct>iterableWithSize(2)); - - verify(serviceFactory.mockDatabaseClient()).readOnlyTransaction(TimestampBound.strong()); - verify(mockTx).readUsingIndex("users", "theindex", KeySet.all(), Arrays.asList("id", "name")); - } - - @Test - @Category(NeedsRunner.class) - public void readPipeline() throws Exception { - Timestamp timestamp = Timestamp.ofTimeMicroseconds(12345); - - PCollectionView<Transaction> tx = pipeline - .apply("tx", SpannerIO.createTransaction() - .withProjectId("test") - .withInstanceId("123") - .withDatabaseId("aaa") - .withServiceFactory(serviceFactory)); - - PCollection<Struct> one = pipeline.apply("read q", SpannerIO.read() - .withProjectId("test") - .withInstanceId("123") - .withDatabaseId("aaa") - .withTimestamp(Timestamp.now()) - .withQuery("SELECT * FROM users") - .withServiceFactory(serviceFactory) - .withTransaction(tx)); - PCollection<Struct> two = pipeline.apply("read r", SpannerIO.read() - .withProjectId("test") - .withInstanceId("123") - .withDatabaseId("aaa") - .withTimestamp(Timestamp.now()) - .withTable("users") - .withColumns("id", "name") - .withServiceFactory(serviceFactory) - .withTransaction(tx)); - - when(serviceFactory.mockDatabaseClient().readOnlyTransaction(any(TimestampBound.class))) - .thenReturn(mockTx); - - when(mockTx.executeQuery(Statement.of("SELECT 1"))).thenReturn(ResultSets.forRows(Type.struct(), - Collections.<Struct>emptyList())); - - when(mockTx.executeQuery(Statement.of("SELECT * FROM users"))) - .thenReturn(ResultSets.forRows(fakeType, fakeRows)); - when(mockTx.read("users", KeySet.all(), Arrays.asList("id", "name"))) - .thenReturn(ResultSets.forRows(fakeType, fakeRows)); - when(mockTx.getReadTimestamp()).thenReturn(timestamp); - - PAssert.that(one).containsInAnyOrder(fakeRows); - PAssert.that(two).containsInAnyOrder(fakeRows); - - pipeline.run(); - - verify(serviceFactory.mockDatabaseClient(), times(2)) - .readOnlyTransaction(TimestampBound.ofReadTimestamp(timestamp)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java new file mode 100644 index 0000000..5bdfea5 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner; + +import static org.mockito.Mockito.argThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.withSettings; + +import com.google.api.core.ApiFuture; +import com.google.cloud.ServiceFactory; +import com.google.cloud.spanner.DatabaseClient; +import com.google.cloud.spanner.DatabaseId; +import com.google.cloud.spanner.Mutation; +import com.google.cloud.spanner.Spanner; +import com.google.cloud.spanner.SpannerOptions; +import com.google.common.collect.Iterables; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import javax.annotation.concurrent.GuardedBy; + +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFnTester; +import org.apache.beam.sdk.values.PCollection; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentMatcher; +import org.mockito.Matchers; + + +/** + * Unit tests for {@link SpannerIO}. + */ +@RunWith(JUnit4.class) +public class SpannerIOTest implements Serializable { + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + @Rule public transient ExpectedException thrown = ExpectedException.none(); + + private FakeServiceFactory serviceFactory; + + @Before + @SuppressWarnings("unchecked") + public void setUp() throws Exception { + serviceFactory = new FakeServiceFactory(); + } + + @Test + public void emptyTransform() throws Exception { + SpannerIO.Write write = SpannerIO.write(); + thrown.expect(NullPointerException.class); + thrown.expectMessage("requires instance id to be set with"); + write.validate(null); + } + + @Test + public void emptyInstanceId() throws Exception { + SpannerIO.Write write = SpannerIO.write().withDatabaseId("123"); + thrown.expect(NullPointerException.class); + thrown.expectMessage("requires instance id to be set with"); + write.validate(null); + } + + @Test + public void emptyDatabaseId() throws Exception { + SpannerIO.Write write = SpannerIO.write().withInstanceId("123"); + thrown.expect(NullPointerException.class); + thrown.expectMessage("requires database id to be set with"); + write.validate(null); + } + + @Test + @Category(NeedsRunner.class) + public void singleMutationPipeline() throws Exception { + Mutation mutation = Mutation.newInsertOrUpdateBuilder("test").set("one").to(2).build(); + PCollection<Mutation> mutations = pipeline.apply(Create.of(mutation)); + + mutations.apply( + SpannerIO.write() + .withProjectId("test-project") + .withInstanceId("test-instance") + .withDatabaseId("test-database") + .withServiceFactory(serviceFactory)); + pipeline.run(); + verify(serviceFactory.mockSpanner()) + .getDatabaseClient(DatabaseId.of("test-project", "test-instance", "test-database")); + verify(serviceFactory.mockDatabaseClient(), times(1)) + .writeAtLeastOnce(argThat(new IterableOfSize(1))); + } + + @Test + public void batching() throws Exception { + Mutation one = Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build(); + Mutation two = Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build(); + SpannerIO.Write write = + SpannerIO.write() + .withProjectId("test-project") + .withInstanceId("test-instance") + .withDatabaseId("test-database") + .withBatchSizeBytes(1000000000) + .withServiceFactory(serviceFactory); + SpannerIO.SpannerWriteFn writerFn = new SpannerIO.SpannerWriteFn(write); + DoFnTester<Mutation, Void> fnTester = DoFnTester.of(writerFn); + fnTester.processBundle(Arrays.asList(one, two)); + + verify(serviceFactory.mockSpanner()) + .getDatabaseClient(DatabaseId.of("test-project", "test-instance", "test-database")); + verify(serviceFactory.mockDatabaseClient(), times(1)) + .writeAtLeastOnce(argThat(new IterableOfSize(2))); + } + + @Test + public void batchingGroups() throws Exception { + Mutation one = Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build(); + Mutation two = Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build(); + Mutation three = Mutation.newInsertOrUpdateBuilder("test").set("three").to(3).build(); + + // Have a room to accumulate one more item. + long batchSize = MutationSizeEstimator.sizeOf(one) + 1; + + SpannerIO.Write write = + SpannerIO.write() + .withProjectId("test-project") + .withInstanceId("test-instance") + .withDatabaseId("test-database") + .withBatchSizeBytes(batchSize) + .withServiceFactory(serviceFactory); + SpannerIO.SpannerWriteFn writerFn = new SpannerIO.SpannerWriteFn(write); + DoFnTester<Mutation, Void> fnTester = DoFnTester.of(writerFn); + fnTester.processBundle(Arrays.asList(one, two, three)); + + verify(serviceFactory.mockSpanner()) + .getDatabaseClient(DatabaseId.of("test-project", "test-instance", "test-database")); + verify(serviceFactory.mockDatabaseClient(), times(1)) + .writeAtLeastOnce(argThat(new IterableOfSize(2))); + verify(serviceFactory.mockDatabaseClient(), times(1)) + .writeAtLeastOnce(argThat(new IterableOfSize(1))); + } + + @Test + public void noBatching() throws Exception { + Mutation one = Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build(); + Mutation two = Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build(); + SpannerIO.Write write = + SpannerIO.write() + .withProjectId("test-project") + .withInstanceId("test-instance") + .withDatabaseId("test-database") + .withBatchSizeBytes(0) // turn off batching. + .withServiceFactory(serviceFactory); + SpannerIO.SpannerWriteFn writerFn = new SpannerIO.SpannerWriteFn(write); + DoFnTester<Mutation, Void> fnTester = DoFnTester.of(writerFn); + fnTester.processBundle(Arrays.asList(one, two)); + + verify(serviceFactory.mockSpanner()) + .getDatabaseClient(DatabaseId.of("test-project", "test-instance", "test-database")); + verify(serviceFactory.mockDatabaseClient(), times(2)) + .writeAtLeastOnce(argThat(new IterableOfSize(1))); + } + + private static class FakeServiceFactory + implements ServiceFactory<Spanner, SpannerOptions>, Serializable { + // Marked as static so they could be returned by serviceFactory, which is serializable. + private static final Object lock = new Object(); + + @GuardedBy("lock") + private static final List<Spanner> mockSpanners = new ArrayList<>(); + + @GuardedBy("lock") + private static final List<DatabaseClient> mockDatabaseClients = new ArrayList<>(); + + @GuardedBy("lock") + private static int count = 0; + + private final int index; + + public FakeServiceFactory() { + synchronized (lock) { + index = count++; + mockSpanners.add(mock(Spanner.class, withSettings().serializable())); + mockDatabaseClients.add(mock(DatabaseClient.class, withSettings().serializable())); + } + ApiFuture voidFuture = mock(ApiFuture.class, withSettings().serializable()); + when(mockSpanner().getDatabaseClient(Matchers.any(DatabaseId.class))) + .thenReturn(mockDatabaseClient()); + when(mockSpanner().closeAsync()).thenReturn(voidFuture); + } + + DatabaseClient mockDatabaseClient() { + synchronized (lock) { + return mockDatabaseClients.get(index); + } + } + + Spanner mockSpanner() { + synchronized (lock) { + return mockSpanners.get(index); + } + } + + @Override + public Spanner create(SpannerOptions serviceOptions) { + return mockSpanner(); + } + } + + private static class IterableOfSize extends ArgumentMatcher<Iterable<Mutation>> { + private final int size; + + private IterableOfSize(int size) { + this.size = size; + } + + @Override + public boolean matches(Object argument) { + return argument instanceof Iterable && Iterables.size((Iterable<?>) argument) == size; + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java deleted file mode 100644 index 09cdb8e..0000000 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java +++ /dev/null @@ -1,258 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.spanner; - -import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; -import static org.hamcrest.Matchers.hasSize; -import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.argThat; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import com.google.cloud.spanner.DatabaseId; -import com.google.cloud.spanner.Mutation; -import com.google.common.collect.Iterables; -import java.io.Serializable; -import java.util.Arrays; - -import org.apache.beam.sdk.testing.NeedsRunner; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFnTester; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.values.PCollection; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.ArgumentMatcher; - -/** - * Unit tests for {@link SpannerIO}. - */ -@RunWith(JUnit4.class) -public class SpannerIOWriteTest implements Serializable { - @Rule public final transient TestPipeline pipeline = TestPipeline.create(); - @Rule public transient ExpectedException thrown = ExpectedException.none(); - - private FakeServiceFactory serviceFactory; - - @Before - @SuppressWarnings("unchecked") - public void setUp() throws Exception { - serviceFactory = new FakeServiceFactory(); - } - - @Test - public void emptyTransform() throws Exception { - SpannerIO.Write write = SpannerIO.write(); - thrown.expect(NullPointerException.class); - thrown.expectMessage("requires instance id to be set with"); - write.validate(null); - } - - @Test - public void emptyInstanceId() throws Exception { - SpannerIO.Write write = SpannerIO.write().withDatabaseId("123"); - thrown.expect(NullPointerException.class); - thrown.expectMessage("requires instance id to be set with"); - write.validate(null); - } - - @Test - public void emptyDatabaseId() throws Exception { - SpannerIO.Write write = SpannerIO.write().withInstanceId("123"); - thrown.expect(NullPointerException.class); - thrown.expectMessage("requires database id to be set with"); - write.validate(null); - } - - @Test - @Category(NeedsRunner.class) - public void singleMutationPipeline() throws Exception { - Mutation mutation = Mutation.newInsertOrUpdateBuilder("test").set("one").to(2).build(); - PCollection<Mutation> mutations = pipeline.apply(Create.of(mutation)); - - mutations.apply( - SpannerIO.write() - .withProjectId("test-project") - .withInstanceId("test-instance") - .withDatabaseId("test-database") - .withServiceFactory(serviceFactory)); - pipeline.run(); - verify(serviceFactory.mockSpanner()) - .getDatabaseClient(DatabaseId.of("test-project", "test-instance", "test-database")); - verify(serviceFactory.mockDatabaseClient(), times(1)) - .writeAtLeastOnce(argThat(new IterableOfSize(1))); - } - - @Test - @Category(NeedsRunner.class) - public void singleMutationGroupPipeline() throws Exception { - Mutation one = Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build(); - Mutation two = Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build(); - Mutation three = Mutation.newInsertOrUpdateBuilder("test").set("three").to(3).build(); - PCollection<MutationGroup> mutations = pipeline - .apply(Create.<MutationGroup>of(g(one, two, three))); - mutations.apply( - SpannerIO.write() - .withProjectId("test-project") - .withInstanceId("test-instance") - .withDatabaseId("test-database") - .withServiceFactory(serviceFactory) - .grouped()); - pipeline.run(); - verify(serviceFactory.mockSpanner()) - .getDatabaseClient(DatabaseId.of("test-project", "test-instance", "test-database")); - verify(serviceFactory.mockDatabaseClient(), times(1)) - .writeAtLeastOnce(argThat(new IterableOfSize(3))); - } - - @Test - public void batching() throws Exception { - MutationGroup one = g(Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build()); - MutationGroup two = g(Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build()); - SpannerIO.Write write = - SpannerIO.write() - .withProjectId("test-project") - .withInstanceId("test-instance") - .withDatabaseId("test-database") - .withBatchSizeBytes(1000000000) - .withServiceFactory(serviceFactory); - SpannerWriteGroupFn writerFn = new SpannerWriteGroupFn(write); - DoFnTester<MutationGroup, Void> fnTester = DoFnTester.of(writerFn); - fnTester.processBundle(Arrays.asList(one, two)); - - verify(serviceFactory.mockSpanner()) - .getDatabaseClient(DatabaseId.of("test-project", "test-instance", "test-database")); - verify(serviceFactory.mockDatabaseClient(), times(1)) - .writeAtLeastOnce(argThat(new IterableOfSize(2))); - } - - @Test - public void batchingGroups() throws Exception { - MutationGroup one = g(Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build()); - MutationGroup two = g(Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build()); - MutationGroup three = g(Mutation.newInsertOrUpdateBuilder("test").set("three").to(3).build()); - - // Have a room to accumulate one more item. - long batchSize = MutationSizeEstimator.sizeOf(one) + 1; - - SpannerIO.Write write = - SpannerIO.write() - .withProjectId("test-project") - .withInstanceId("test-instance") - .withDatabaseId("test-database") - .withBatchSizeBytes(batchSize) - .withServiceFactory(serviceFactory); - SpannerWriteGroupFn writerFn = new SpannerWriteGroupFn(write); - DoFnTester<MutationGroup, Void> fnTester = DoFnTester.of(writerFn); - fnTester.processBundle(Arrays.asList(one, two, three)); - - verify(serviceFactory.mockSpanner()) - .getDatabaseClient(DatabaseId.of("test-project", "test-instance", "test-database")); - verify(serviceFactory.mockDatabaseClient(), times(1)) - .writeAtLeastOnce(argThat(new IterableOfSize(2))); - verify(serviceFactory.mockDatabaseClient(), times(1)) - .writeAtLeastOnce(argThat(new IterableOfSize(1))); - } - - @Test - public void noBatching() throws Exception { - MutationGroup one = g(Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build()); - MutationGroup two = g(Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build()); - SpannerIO.Write write = - SpannerIO.write() - .withProjectId("test-project") - .withInstanceId("test-instance") - .withDatabaseId("test-database") - .withBatchSizeBytes(0) // turn off batching. - .withServiceFactory(serviceFactory); - SpannerWriteGroupFn writerFn = new SpannerWriteGroupFn(write); - DoFnTester<MutationGroup, Void> fnTester = DoFnTester.of(writerFn); - fnTester.processBundle(Arrays.asList(one, two)); - - verify(serviceFactory.mockSpanner()) - .getDatabaseClient(DatabaseId.of("test-project", "test-instance", "test-database")); - verify(serviceFactory.mockDatabaseClient(), times(2)) - .writeAtLeastOnce(argThat(new IterableOfSize(1))); - } - - @Test - public void groups() throws Exception { - Mutation one = Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build(); - Mutation two = Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build(); - Mutation three = Mutation.newInsertOrUpdateBuilder("test").set("three").to(3).build(); - - // Smallest batch size - long batchSize = 1; - - SpannerIO.Write write = - SpannerIO.write() - .withProjectId("test-project") - .withInstanceId("test-instance") - .withDatabaseId("test-database") - .withBatchSizeBytes(batchSize) - .withServiceFactory(serviceFactory); - SpannerWriteGroupFn writerFn = new SpannerWriteGroupFn(write); - DoFnTester<MutationGroup, Void> fnTester = DoFnTester.of(writerFn); - fnTester.processBundle(Arrays.asList(g(one, two, three))); - - verify(serviceFactory.mockSpanner()) - .getDatabaseClient(DatabaseId.of("test-project", "test-instance", "test-database")); - verify(serviceFactory.mockDatabaseClient(), times(1)) - .writeAtLeastOnce(argThat(new IterableOfSize(3))); - } - - @Test - public void displayData() throws Exception { - SpannerIO.Write write = - SpannerIO.write() - .withProjectId("test-project") - .withInstanceId("test-instance") - .withDatabaseId("test-database") - .withBatchSizeBytes(123); - - DisplayData data = DisplayData.from(write); - assertThat(data.items(), hasSize(4)); - assertThat(data, hasDisplayItem("projectId", "test-project")); - assertThat(data, hasDisplayItem("instanceId", "test-instance")); - assertThat(data, hasDisplayItem("databaseId", "test-database")); - assertThat(data, hasDisplayItem("batchSizeBytes", 123)); - } - - private static class IterableOfSize extends ArgumentMatcher<Iterable<Mutation>> { - private final int size; - - private IterableOfSize(int size) { - this.size = size; - } - - @Override - public boolean matches(Object argument) { - return argument instanceof Iterable && Iterables.size((Iterable<?>) argument) == size; - } - } - - private static MutationGroup g(Mutation m, Mutation... other) { - return MutationGroup.create(m, other); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadIT.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadIT.java deleted file mode 100644 index d866975..0000000 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadIT.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.spanner; - -import com.google.cloud.spanner.Database; -import com.google.cloud.spanner.DatabaseAdminClient; -import com.google.cloud.spanner.DatabaseClient; -import com.google.cloud.spanner.DatabaseId; -import com.google.cloud.spanner.Mutation; -import com.google.cloud.spanner.Operation; -import com.google.cloud.spanner.Spanner; -import com.google.cloud.spanner.SpannerOptions; -import com.google.cloud.spanner.Struct; -import com.google.cloud.spanner.TimestampBound; -import com.google.spanner.admin.database.v1.CreateDatabaseMetadata; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; -import org.apache.beam.sdk.options.Default; -import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.testing.TestPipelineOptions; -import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionView; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** End-to-end test of Cloud Spanner Source. */ -@RunWith(JUnit4.class) -public class SpannerReadIT { - - private static final int MAX_DB_NAME_LENGTH = 30; - - @Rule public final transient TestPipeline p = TestPipeline.create(); - - /** Pipeline options for this test. */ - public interface SpannerTestPipelineOptions extends TestPipelineOptions { - @Description("Instance ID to write to in Spanner") - @Default.String("beam-test") - String getInstanceId(); - void setInstanceId(String value); - - @Description("Database ID prefix to write to in Spanner") - @Default.String("beam-testdb") - String getDatabaseIdPrefix(); - void setDatabaseIdPrefix(String value); - - @Description("Table name") - @Default.String("users") - String getTable(); - void setTable(String value); - } - - private Spanner spanner; - private DatabaseAdminClient databaseAdminClient; - private SpannerTestPipelineOptions options; - private String databaseName; - private String project; - - @Before - public void setUp() throws Exception { - PipelineOptionsFactory.register(SpannerTestPipelineOptions.class); - options = TestPipeline.testingPipelineOptions().as(SpannerTestPipelineOptions.class); - - project = options.as(GcpOptions.class).getProject(); - - spanner = SpannerOptions.newBuilder().setProjectId(project).build().getService(); - - databaseName = generateDatabaseName(); - - databaseAdminClient = spanner.getDatabaseAdminClient(); - - // Delete database if exists. - databaseAdminClient.dropDatabase(options.getInstanceId(), databaseName); - - Operation<Database, CreateDatabaseMetadata> op = - databaseAdminClient.createDatabase( - options.getInstanceId(), - databaseName, - Collections.singleton( - "CREATE TABLE " - + options.getTable() - + " (" - + " Key INT64," - + " Value STRING(MAX)," - + ") PRIMARY KEY (Key)")); - op.waitFor(); - } - - @Test - public void testRead() throws Exception { - DatabaseClient databaseClient = - spanner.getDatabaseClient( - DatabaseId.of( - project, options.getInstanceId(), databaseName)); - - List<Mutation> mutations = new ArrayList<>(); - for (int i = 0; i < 5L; i++) { - mutations.add( - Mutation.newInsertOrUpdateBuilder(options.getTable()) - .set("key") - .to((long) i) - .set("value") - .to(RandomUtils.randomAlphaNumeric(100)) - .build()); - } - - databaseClient.writeAtLeastOnce(mutations); - - SpannerConfig spannerConfig = SpannerConfig.create() - .withProjectId(project) - .withInstanceId(options.getInstanceId()) - .withDatabaseId(databaseName); - - PCollectionView<Transaction> tx = - p.apply( - SpannerIO.createTransaction() - .withSpannerConfig(spannerConfig) - .withTimestampBound(TimestampBound.strong())); - - PCollection<Struct> output = - p.apply( - SpannerIO.read() - .withSpannerConfig(spannerConfig) - .withQuery("SELECT * FROM " + options.getTable()) - .withTransaction(tx)); - PAssert.thatSingleton(output.apply("Count rows", Count.<Struct>globally())).isEqualTo(5L); - p.run(); - } - - @After - public void tearDown() throws Exception { - databaseAdminClient.dropDatabase(options.getInstanceId(), databaseName); - spanner.close(); - } - - private String generateDatabaseName() { - String random = RandomUtils - .randomAlphaNumeric(MAX_DB_NAME_LENGTH - 1 - options.getDatabaseIdPrefix().length()); - return options.getDatabaseIdPrefix() + "-" + random; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT.java index d208f5c..8df224b 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT.java @@ -33,7 +33,7 @@ import com.google.cloud.spanner.SpannerOptions; import com.google.cloud.spanner.Statement; import com.google.spanner.admin.database.v1.CreateDatabaseMetadata; import java.util.Collections; -import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; + import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; @@ -42,6 +42,7 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestPipelineOptions; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.commons.lang3.RandomStringUtils; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -59,6 +60,11 @@ public class SpannerWriteIT { /** Pipeline options for this test. */ public interface SpannerTestPipelineOptions extends TestPipelineOptions { + @Description("Project ID for Spanner") + @Default.String("apache-beam-testing") + String getProjectId(); + void setProjectId(String value); + @Description("Instance ID to write to in Spanner") @Default.String("beam-test") String getInstanceId(); @@ -79,16 +85,13 @@ public class SpannerWriteIT { private DatabaseAdminClient databaseAdminClient; private SpannerTestPipelineOptions options; private String databaseName; - private String project; @Before public void setUp() throws Exception { PipelineOptionsFactory.register(SpannerTestPipelineOptions.class); options = TestPipeline.testingPipelineOptions().as(SpannerTestPipelineOptions.class); - project = options.as(GcpOptions.class).getProject(); - - spanner = SpannerOptions.newBuilder().setProjectId(project).build().getService(); + spanner = SpannerOptions.newBuilder().setProjectId(options.getProjectId()).build().getService(); databaseName = generateDatabaseName(); @@ -112,8 +115,9 @@ public class SpannerWriteIT { } private String generateDatabaseName() { - String random = RandomUtils - .randomAlphaNumeric(MAX_DB_NAME_LENGTH - 1 - options.getDatabaseIdPrefix().length()); + String random = RandomStringUtils + .randomAlphanumeric(MAX_DB_NAME_LENGTH - 1 - options.getDatabaseIdPrefix().length()) + .toLowerCase(); return options.getDatabaseIdPrefix() + "-" + random; } @@ -123,7 +127,7 @@ public class SpannerWriteIT { .apply(ParDo.of(new GenerateMutations(options.getTable()))) .apply( SpannerIO.write() - .withProjectId(project) + .withProjectId(options.getProjectId()) .withInstanceId(options.getInstanceId()) .withDatabaseId(databaseName)); @@ -131,7 +135,7 @@ public class SpannerWriteIT { DatabaseClient databaseClient = spanner.getDatabaseClient( DatabaseId.of( - project, options.getInstanceId(), databaseName)); + options.getProjectId(), options.getInstanceId(), databaseName)); ResultSet resultSet = databaseClient @@ -145,7 +149,7 @@ public class SpannerWriteIT { @After public void tearDown() throws Exception { databaseAdminClient.dropDatabase(options.getInstanceId(), databaseName); - spanner.close(); + spanner.closeAsync().get(); } private static class GenerateMutations extends DoFn<Long, Mutation> { @@ -161,7 +165,7 @@ public class SpannerWriteIT { Mutation.WriteBuilder builder = Mutation.newInsertOrUpdateBuilder(table); Long key = c.element(); builder.set("Key").to(key); - builder.set("Value").to(RandomUtils.randomAlphaNumeric(valueSize)); + builder.set("Value").to(RandomStringUtils.randomAlphabetic(valueSize)); Mutation mutation = builder.build(); c.output(mutation); } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/hadoop-common/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-common/pom.xml b/sdks/java/io/hadoop-common/pom.xml index 4bcbcd7..8749243 100644 --- a/sdks/java/io/hadoop-common/pom.xml +++ b/sdks/java/io/hadoop-common/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-parent</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/hadoop-file-system/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop-file-system/pom.xml b/sdks/java/io/hadoop-file-system/pom.xml index a9c2e57..db5a1db 100644 --- a/sdks/java/io/hadoop-file-system/pom.xml +++ b/sdks/java/io/hadoop-file-system/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-parent</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> @@ -44,6 +44,37 @@ </plugins> </build> + <properties> + <!-- + This is the version of Hadoop used to compile the hadoop-common module. + This dependency is defined with a provided scope. + Users must supply their own Hadoop version at runtime. + --> + <hadoop.version>2.7.3</hadoop.version> + </properties> + + <dependencyManagement> + <!-- + We define dependencies here instead of sdks/java/io because + of a version mimatch between this Hadoop version and other + Hadoop versions declared in other io submodules. + --> + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <classifier>tests</classifier> + <version>${hadoop.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minicluster</artifactId> + <version>${hadoop.version}</version> + </dependency> + </dependencies> + </dependencyManagement> + <dependencies> <dependency> <groupId>org.apache.beam</groupId> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/hadoop/input-format/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop/input-format/pom.xml b/sdks/java/io/hadoop/input-format/pom.xml index 0953119..06f9f11 100644 --- a/sdks/java/io/hadoop/input-format/pom.xml +++ b/sdks/java/io/hadoop/input-format/pom.xml @@ -20,7 +20,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-hadoop-parent</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> <artifactId>beam-sdks-java-io-hadoop-input-format</artifactId> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java b/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java index 0b4c23f..efd47fd 100644 --- a/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java +++ b/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java @@ -166,7 +166,7 @@ import org.slf4j.LoggerFactory; * } * </pre> */ -@Experimental(Experimental.Kind.SOURCE_SINK) +@Experimental public class HadoopInputFormatIO { private static final Logger LOG = LoggerFactory.getLogger(HadoopInputFormatIO.class); http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/hadoop/jdk1.8-tests/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop/jdk1.8-tests/pom.xml b/sdks/java/io/hadoop/jdk1.8-tests/pom.xml index 12944f4..9f84e88 100644 --- a/sdks/java/io/hadoop/jdk1.8-tests/pom.xml +++ b/sdks/java/io/hadoop/jdk1.8-tests/pom.xml @@ -26,7 +26,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-hadoop-parent</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> <artifactId>beam-sdks-java-io-hadoop-jdk1.8-tests</artifactId> @@ -108,11 +108,13 @@ <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> + <version>${spark.version}</version> <scope>runtime</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> + <version>${spark.version}</version> <scope>runtime</scope> <exclusions> <exclusion> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithElasticTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithElasticTest.java b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithElasticTest.java index 3f866a4..8745521 100644 --- a/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithElasticTest.java +++ b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithElasticTest.java @@ -20,7 +20,6 @@ package org.apache.beam.sdk.io.hadoop.inputformat; import java.io.File; import java.io.IOException; import java.io.Serializable; -import java.net.ServerSocket; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -77,7 +76,7 @@ public class HIFIOWithElasticTest implements Serializable { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(HIFIOWithElasticTest.class); private static final String ELASTIC_IN_MEM_HOSTNAME = "127.0.0.1"; - private static String elasticInMemPort = "9200"; + private static final String ELASTIC_IN_MEM_PORT = "9200"; private static final String ELASTIC_INTERNAL_VERSION = "5.x"; private static final String TRUE = "true"; private static final String ELASTIC_INDEX_NAME = "beamdb"; @@ -95,10 +94,6 @@ public class HIFIOWithElasticTest implements Serializable { @BeforeClass public static void startServer() throws NodeValidationException, InterruptedException, IOException { - ServerSocket serverSocket = new ServerSocket(0); - int port = serverSocket.getLocalPort(); - serverSocket.close(); - elasticInMemPort = String.valueOf(port); ElasticEmbeddedServer.startElasticEmbeddedServer(); } @@ -178,7 +173,7 @@ public class HIFIOWithElasticTest implements Serializable { public Configuration getConfiguration() { Configuration conf = new Configuration(); conf.set(ConfigurationOptions.ES_NODES, ELASTIC_IN_MEM_HOSTNAME); - conf.set(ConfigurationOptions.ES_PORT, String.format("%s", elasticInMemPort)); + conf.set(ConfigurationOptions.ES_PORT, String.format("%s", ELASTIC_IN_MEM_PORT)); conf.set(ConfigurationOptions.ES_RESOURCE, ELASTIC_RESOURCE); conf.set("es.internal.es.version", ELASTIC_INTERNAL_VERSION); conf.set(ConfigurationOptions.ES_NODES_DISCOVERY, TRUE); @@ -214,7 +209,7 @@ public class HIFIOWithElasticTest implements Serializable { Settings settings = Settings.builder() .put("node.data", TRUE) .put("network.host", ELASTIC_IN_MEM_HOSTNAME) - .put("http.port", elasticInMemPort) + .put("http.port", ELASTIC_IN_MEM_PORT) .put("path.data", elasticTempFolder.getRoot().getPath()) .put("path.home", elasticTempFolder.getRoot().getPath()) .put("transport.type", "local") http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/hadoop/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop/pom.xml b/sdks/java/io/hadoop/pom.xml index bc3569d..a1c7a2e 100644 --- a/sdks/java/io/hadoop/pom.xml +++ b/sdks/java/io/hadoop/pom.xml @@ -20,7 +20,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-parent</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> <packaging>pom</packaging> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/hbase/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/pom.xml b/sdks/java/io/hbase/pom.xml index 40f516a..746b993 100644 --- a/sdks/java/io/hbase/pom.xml +++ b/sdks/java/io/hbase/pom.xml @@ -22,7 +22,7 @@ <parent> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-parent</artifactId> - <version>2.2.0-SNAPSHOT</version> + <version>2.1.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> @@ -31,7 +31,8 @@ <description>Library to read and write from/to HBase</description> <properties> - <hbase.version>1.2.6</hbase.version> + <hbase.version>1.2.5</hbase.version> + <hbase.hadoop.version>2.5.1</hbase.hadoop.version> </properties> <build> @@ -63,12 +64,6 @@ </dependency> <dependency> - <groupId>com.google.auto.service</groupId> - <artifactId>auto-service</artifactId> - <optional>true</optional> - </dependency> - - <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-shaded-client</artifactId> <version>${hbase.version}</version> @@ -108,26 +103,15 @@ <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-minicluster</artifactId> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> + <version>${hbase.hadoop.version}</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> + <version>${hbase.hadoop.version}</version> <scope>test</scope> - <exclusions> - <!-- Fix build on JDK-9 --> - <exclusion> - <groupId>jdk.tools</groupId> - <artifactId>jdk.tools</artifactId> - </exclusion> - </exclusions> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java deleted file mode 100644 index 2973d1b..0000000 --- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseCoderProviderRegistrar.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.hbase; - -import com.google.auto.service.AutoService; -import com.google.common.collect.ImmutableList; -import java.util.List; -import org.apache.beam.sdk.coders.CoderProvider; -import org.apache.beam.sdk.coders.CoderProviderRegistrar; -import org.apache.beam.sdk.coders.CoderProviders; -import org.apache.beam.sdk.values.TypeDescriptor; -import org.apache.hadoop.hbase.client.Result; - -/** - * A {@link CoderProviderRegistrar} for standard types used with {@link HBaseIO}. - */ -@AutoService(CoderProviderRegistrar.class) -public class HBaseCoderProviderRegistrar implements CoderProviderRegistrar { - @Override - public List<CoderProvider> getCoderProviders() { - return ImmutableList.of( - HBaseMutationCoder.getCoderProvider(), - CoderProviders.forCoder(TypeDescriptor.of(Result.class), HBaseResultCoder.of())); - } -}
