Repository: beam Updated Branches: refs/heads/master 92d1a6635 -> 7e97820c5
Upgrade to bigtable-client-core 0.9.5.1 Transitively bump netty version to 4.1.6.Final Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d1caa1cd Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d1caa1cd Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d1caa1cd Branch: refs/heads/master Commit: d1caa1cde40cfb955c53e6cb67a86b1528b3935a Parents: 92d1a66 Author: Andrew Martin <[email protected]> Authored: Fri Mar 17 13:21:12 2017 -0400 Committer: Dan Halperin <[email protected]> Committed: Tue Mar 21 11:51:48 2017 -0700 ---------------------------------------------------------------------- pom.xml | 2 +- sdks/java/io/google-cloud-platform/pom.xml | 2 +- .../beam/sdk/io/gcp/bigtable/BigtableIO.java | 48 +------------------- .../beam/sdk/io/gcp/GcpApiSurfaceTest.java | 3 +- .../sdk/io/gcp/bigtable/BigtableIOTest.java | 12 +---- .../sdk/io/gcp/bigtable/BigtableWriteIT.java | 15 ++---- 6 files changed, 9 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/d1caa1cd/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 65c5012..a4b1090 100644 --- a/pom.xml +++ b/pom.xml @@ -127,7 +127,7 @@ <joda.version>2.4</joda.version> <junit.version>4.12</junit.version> <mockito.version>1.9.5</mockito.version> - <netty.version>4.1.3.Final</netty.version> + <netty.version>4.1.6.Final</netty.version> <os-maven-plugin.version>1.4.0.Final</os-maven-plugin.version> <protobuf.version>3.1.0</protobuf.version> <pubsub.version>v1-rev10-1.22.0</pubsub.version> http://git-wip-us.apache.org/repos/asf/beam/blob/d1caa1cd/sdks/java/io/google-cloud-platform/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml index 393db18..0d4f023 100644 --- a/sdks/java/io/google-cloud-platform/pom.xml +++ b/sdks/java/io/google-cloud-platform/pom.xml @@ -32,7 +32,7 @@ <packaging>jar</packaging> <properties> - <bigtable.version>0.9.2</bigtable.version> + <bigtable.version>0.9.5.1</bigtable.version> </properties> <build> http://git-wip-us.apache.org/repos/asf/beam/blob/d1caa1cd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index 2d6cbba..7091e15 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -29,14 +29,12 @@ import com.google.bigtable.v2.SampleRowKeysResponse; import com.google.cloud.bigtable.config.BigtableOptions; import com.google.cloud.bigtable.config.CredentialOptions; import com.google.cloud.bigtable.config.CredentialOptions.CredentialType; -import com.google.cloud.bigtable.config.RetryOptions; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.protobuf.ByteString; -import io.grpc.Status; import java.io.IOException; import java.util.Collections; import java.util.Iterator; @@ -213,20 +211,10 @@ public class BigtableIO { checkNotNull(optionsBuilder, "optionsBuilder"); // TODO: is there a better way to clone a Builder? Want it to be immune from user changes. BigtableOptions options = optionsBuilder.build(); - RetryOptions retryOptions = options.getRetryOptions(); // Set data channel count to one because there is only 1 scanner in this session - // Use retryOptionsToBuilder because absent in Bigtable library - // TODO: replace with RetryOptions.toBuilder() when added to Bigtable library - // Set batch size because of bug (incorrect initialization) in Bigtable library - // TODO: remove setRetryOptions when fixed in Bigtable library BigtableOptions.Builder clonedBuilder = options.toBuilder() - .setDataChannelCount(1) - .setRetryOptions( - retryOptionsToBuilder(retryOptions) - .setStreamingBatchSize(Math.min(retryOptions.getStreamingBatchSize(), - retryOptions.getStreamingBufferSize() / 2)) - .build()); + .setDataChannelCount(1); BigtableOptions optionsWithAgent = clonedBuilder.setUserAgent(getUserAgent()).build(); return new Read(optionsWithAgent, tableId, keyRange, filter, bigtableService); @@ -454,22 +442,12 @@ public class BigtableIO { checkNotNull(optionsBuilder, "optionsBuilder"); // TODO: is there a better way to clone a Builder? Want it to be immune from user changes. BigtableOptions options = optionsBuilder.build(); - RetryOptions retryOptions = options.getRetryOptions(); // Set useBulkApi to true for enabling bulk writes - // Use retryOptionsToBuilder because absent in Bigtable library - // TODO: replace with RetryOptions.toBuilder() when added to Bigtable library - // Set batch size because of bug (incorrect initialization) in Bigtable library - // TODO: remove setRetryOptions when fixed in Bigtable library BigtableOptions.Builder clonedBuilder = options.toBuilder() .setBulkOptions( options.getBulkOptions().toBuilder() .setUseBulkApi(true) - .build()) - .setRetryOptions( - retryOptionsToBuilder(retryOptions) - .setStreamingBatchSize(Math.min(retryOptions.getStreamingBatchSize(), - retryOptions.getStreamingBufferSize() / 2)) .build()); BigtableOptions optionsWithAgent = clonedBuilder.setUserAgent(getUserAgent()).build(); return new Write(optionsWithAgent, tableId, bigtableService); @@ -1068,28 +1046,4 @@ public class BigtableIO { javaVersion, "0.3.0" /* TODO get Bigtable client version directly from jar. */); } - - /** - * A helper function to convert a RetryOptions into a RetryOptions.Builder. - */ - private static RetryOptions.Builder retryOptionsToBuilder(RetryOptions options) { - RetryOptions.Builder builder = new RetryOptions.Builder(); - builder.setEnableRetries(options.enableRetries()); - builder.setInitialBackoffMillis(options.getInitialBackoffMillis()); - builder.setBackoffMultiplier(options.getBackoffMultiplier()); - builder.setMaxElapsedBackoffMillis(options.getMaxElaspedBackoffMillis()); - builder.setStreamingBufferSize(options.getStreamingBufferSize()); - builder.setStreamingBatchSize(options.getStreamingBatchSize()); - builder.setReadPartialRowTimeoutMillis(options.getReadPartialRowTimeoutMillis()); - builder.setMaxScanTimeoutRetries(options.getMaxScanTimeoutRetries()); - builder.setAllowRetriesWithoutTimestamp(options.allowRetriesWithoutTimestamp()); - - for (Status.Code code : Status.Code.values()) { - if (options.isRetryable(code)) { - builder.addStatusToRetryOn(code); - } - } - - return builder; - } } http://git-wip-us.apache.org/repos/asf/beam/blob/d1caa1cd/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java index 717c6d3..0987140 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java @@ -55,10 +55,9 @@ public class GcpApiSurfaceTest { classesInPackage("com.google.auth"), classesInPackage("com.google.bigtable.v2"), classesInPackage("com.google.cloud.bigtable.config"), + Matchers.<Class<?>>equalTo(com.google.cloud.bigtable.grpc.BigtableClusterName.class), Matchers.<Class<?>>equalTo(com.google.cloud.bigtable.grpc.BigtableInstanceName.class), Matchers.<Class<?>>equalTo(com.google.cloud.bigtable.grpc.BigtableTableName.class), - // https://github.com/GoogleCloudPlatform/cloud-bigtable-client/pull/1056 - classesInPackage("com.google.common.collect"), // via Bigtable, PR above out to fix. classesInPackage("com.google.datastore.v1"), classesInPackage("com.google.protobuf"), http://git-wip-us.apache.org/repos/asf/beam/blob/d1caa1cd/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java index 878785b..1c770a2 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java @@ -699,14 +699,10 @@ public class BigtableIOTest { BigtableIO.read().withBigtableOptions(optionsBuilder.build()); BigtableOptions options = read.getBigtableOptions(); - assertEquals(RetryOptions.DEFAULT_STREAMING_BATCH_SIZE, - options.getRetryOptions().getStreamingBatchSize()); assertEquals(initialBackoffMillis, options.getRetryOptions().getInitialBackoffMillis()); assertThat(options.getRetryOptions(), - Matchers.equalTo(retryOptionsBuilder - .setStreamingBatchSize(RetryOptions.DEFAULT_STREAMING_BATCH_SIZE) - .build())); + Matchers.equalTo(retryOptionsBuilder.build())); } @Test @@ -731,8 +727,6 @@ public class BigtableIOTest { BigtableOptions options = write.getBigtableOptions(); assertEquals(true, options.getBulkOptions().useBulkApi()); assertEquals(maxInflightRpcs, options.getBulkOptions().getMaxInflightRpcs()); - assertEquals(RetryOptions.DEFAULT_STREAMING_BATCH_SIZE, - options.getRetryOptions().getStreamingBatchSize()); assertEquals(initialBackoffMillis, options.getRetryOptions().getInitialBackoffMillis()); assertThat(options.getBulkOptions(), @@ -740,9 +734,7 @@ public class BigtableIOTest { .setUseBulkApi(true) .build())); assertThat(options.getRetryOptions(), - Matchers.equalTo(retryOptionsBuilder - .setStreamingBatchSize(RetryOptions.DEFAULT_STREAMING_BATCH_SIZE) - .build())); + Matchers.equalTo(retryOptionsBuilder.build())); } //////////////////////////////////////////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/beam/blob/d1caa1cd/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java index d7792f4..240fb31 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java @@ -32,7 +32,6 @@ import com.google.bigtable.v2.RowSet; import com.google.cloud.bigtable.config.BigtableOptions; import com.google.cloud.bigtable.config.BigtableOptions.Builder; import com.google.cloud.bigtable.config.CredentialOptions; -import com.google.cloud.bigtable.config.RetryOptions; import com.google.cloud.bigtable.grpc.BigtableSession; import com.google.cloud.bigtable.grpc.BigtableTableAdminClient; import com.google.cloud.bigtable.grpc.scanner.ResultScanner; @@ -43,7 +42,6 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Date; import java.util.List; -import java.util.Map; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.CountingInput; import org.apache.beam.sdk.options.GcpOptions; @@ -81,17 +79,11 @@ public class BigtableWriteIT implements Serializable { PipelineOptionsFactory.register(BigtableTestOptions.class); options = TestPipeline.testingPipelineOptions().as(BigtableTestOptions.class); - // RetryOptions streamingBatchSize must be explicitly set for getTableData() - RetryOptions.Builder retryOptionsBuilder = new RetryOptions.Builder(); - retryOptionsBuilder.setStreamingBatchSize( - retryOptionsBuilder.build().getStreamingBufferSize() / 2); - bigtableOptions = new Builder() .setProjectId(options.getProjectId()) .setInstanceId(options.getInstanceId()) .setUserAgent("apache-beam-test") - .setRetryOptions(retryOptionsBuilder.build()) .build(); session = @@ -137,8 +129,8 @@ public class BigtableWriteIT implements Serializable { // Test number of column families and column family name equality Table table = getTable(tableName); - assertThat(table.getColumnFamilies().keySet(), Matchers.hasSize(1)); - assertThat(table.getColumnFamilies(), Matchers.hasKey(COLUMN_FAMILY_NAME)); + assertThat(table.getColumnFamiliesMap().keySet(), Matchers.hasSize(1)); + assertThat(table.getColumnFamiliesMap(), Matchers.hasKey(COLUMN_FAMILY_NAME)); // Test table data equality List<KV<ByteString, ByteString>> tableData = getTableData(tableName); @@ -168,8 +160,7 @@ public class BigtableWriteIT implements Serializable { /** Helper function to create an empty table. */ private void createEmptyTable(String instanceName, String tableId) { Table.Builder tableBuilder = Table.newBuilder(); - Map<String, ColumnFamily> columnFamilies = tableBuilder.getMutableColumnFamilies(); - columnFamilies.put(COLUMN_FAMILY_NAME, ColumnFamily.newBuilder().build()); + tableBuilder.putColumnFamilies(COLUMN_FAMILY_NAME, ColumnFamily.newBuilder().build()); CreateTableRequest.Builder createTableRequestBuilder = CreateTableRequest.newBuilder() .setParent(instanceName)
