Modified addBulkOptions for simplicity
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b9231826 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b9231826 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b9231826 Branch: refs/heads/master Commit: b9231826bb9c8084f3802206fbdd1d9f69fea3a6 Parents: 155409b Author: Ian Zhou <[email protected]> Authored: Thu Jul 7 10:27:47 2016 -0700 Committer: Dan Halperin <[email protected]> Committed: Thu Jul 7 21:53:35 2016 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/bigtable/BigtableIO.java | 95 +++++++++---------- .../sdk/io/gcp/bigtable/BigtableIOTest.java | 99 ++++++++++---------- 2 files changed, 99 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9231826/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index dd17abe..b4c3c75 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -47,7 +47,6 @@ import com.google.bigtable.v1.Row; import com.google.bigtable.v1.RowFilter; import com.google.bigtable.v1.SampleRowKeysResponse; import com.google.cloud.bigtable.config.BigtableOptions; -import com.google.cloud.bigtable.config.BulkOptions; import com.google.cloud.bigtable.config.RetryOptions; import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; @@ -207,10 +206,23 @@ public class BigtableIO { public Read withBigtableOptions(BigtableOptions.Builder optionsBuilder) { checkNotNull(optionsBuilder, "optionsBuilder"); // TODO: is there a better way to clone a Builder? Want it to be immune from user changes. - BigtableOptions.Builder clonedBuilder = optionsBuilder.build().toBuilder(); - clonedBuilder.setDataChannelCount(1); - clonedBuilder = addRetryOptions(clonedBuilder); + BigtableOptions options = optionsBuilder.build(); + RetryOptions retryOptions = options.getRetryOptions(); + + // Set data channel count to one because there is only 1 scanner in this session + // Use retryOptionsToBuilder because absent in Bigtable library + // TODO: replace with RetryOptions.toBuilder() when added to Bigtable library + // Set batch size because of bug (incorrect initialization) in Bigtable library + // TODO: remove setRetryOptions when fixed in Bigtable library + BigtableOptions.Builder clonedBuilder = options.toBuilder() + .setDataChannelCount(1) + .setRetryOptions( + retryOptionsToBuilder(retryOptions) + .setStreamingBatchSize(Math.min(retryOptions.getStreamingBatchSize(), + retryOptions.getStreamingBufferSize() / 2)) + .build()); BigtableOptions optionsWithAgent = clonedBuilder.setUserAgent(getUserAgent()).build(); + return new Read(optionsWithAgent, tableId, filter, bigtableService); } @@ -393,9 +405,24 @@ public class BigtableIO { public Write withBigtableOptions(BigtableOptions.Builder optionsBuilder) { checkNotNull(optionsBuilder, "optionsBuilder"); // TODO: is there a better way to clone a Builder? Want it to be immune from user changes. - BigtableOptions.Builder clonedBuilder = optionsBuilder.build().toBuilder(); - clonedBuilder = addBulkOptions(clonedBuilder); - clonedBuilder = addRetryOptions(clonedBuilder); + BigtableOptions options = optionsBuilder.build(); + RetryOptions retryOptions = options.getRetryOptions(); + + // Set useBulkApi to true for enabling bulk writes + // Use retryOptionsToBuilder because absent in Bigtable library + // TODO: replace with RetryOptions.toBuilder() when added to Bigtable library + // Set batch size because of bug (incorrect initialization) in Bigtable library + // TODO: remove setRetryOptions when fixed in Bigtable library + BigtableOptions.Builder clonedBuilder = options.toBuilder() + .setBulkOptions( + options.getBulkOptions().toBuilder() + .setUseBulkApi(true) + .build()) + .setRetryOptions( + retryOptionsToBuilder(retryOptions) + .setStreamingBatchSize(Math.min(retryOptions.getStreamingBatchSize(), + retryOptions.getStreamingBufferSize() / 2)) + .build()); BigtableOptions optionsWithAgent = clonedBuilder.setUserAgent(getUserAgent()).build(); return new Write(optionsWithAgent, tableId, bigtableService); } @@ -1036,52 +1063,26 @@ public class BigtableIO { } /** - * A helper function to add appropriate bulk options. See - * <a href="https://github.com/GoogleCloudPlatform/cloud-bigtable-client/issues/899">RetryOptions - * toBuilder</a> for issue. - */ - static BigtableOptions.Builder addBulkOptions(BigtableOptions.Builder builder) { - BulkOptions bulkOptions = builder.build().getBulkOptions(); - - BulkOptions.Builder bulkOptionsBuilder = new BulkOptions.Builder() - .setAsyncMutatorWorkerCount(bulkOptions.getAsyncMutatorCount()) - .setUseBulkApi(true) - .setBulkMaxRowKeyCount(bulkOptions.getBulkMaxRowKeyCount()) - .setBulkMaxRequestSize(bulkOptions.getBulkMaxRequestSize()) - .setMaxInflightRpcs(bulkOptions.getMaxInflightRpcs()) - .setMaxMemory(bulkOptions.getMaxMemory()); - - builder.setBulkOptions(bulkOptionsBuilder.build()); - return builder; - } - - /** - * A helper function to add appropriate retry options. See - * <a href="https://github.com/GoogleCloudPlatform/cloud-bigtable-client/issues/899">RetryOptions - * toBuilder</a> for issue. + * A helper function to convert a RetryOptions into a RetryOptions.Builder. */ - static BigtableOptions.Builder addRetryOptions(BigtableOptions.Builder builder) { - RetryOptions retryOptions = builder.build().getRetryOptions(); - - RetryOptions.Builder retryOptionsBuilder = new RetryOptions.Builder() - .setEnableRetries(retryOptions.enableRetries()) - .setInitialBackoffMillis(retryOptions.getInitialBackoffMillis()) - .setBackoffMultiplier(retryOptions.getBackoffMultiplier()) - .setMaxElapsedBackoffMillis(retryOptions.getMaxElaspedBackoffMillis()) - .setStreamingBufferSize(retryOptions.getStreamingBufferSize()) - .setStreamingBatchSize(Math.min(retryOptions.getStreamingBatchSize(), - retryOptions.getStreamingBufferSize() / 2)) - .setReadPartialRowTimeoutMillis(retryOptions.getReadPartialRowTimeoutMillis()) - .setMaxScanTimeoutRetries(retryOptions.getMaxScanTimeoutRetries()) - .setAllowRetriesWithoutTimestamp(retryOptions.allowRetriesWithoutTimestamp()); + private static RetryOptions.Builder retryOptionsToBuilder(RetryOptions options) { + RetryOptions.Builder builder = new RetryOptions.Builder(); + builder.setEnableRetries(options.enableRetries()); + builder.setInitialBackoffMillis(options.getInitialBackoffMillis()); + builder.setBackoffMultiplier(options.getBackoffMultiplier()); + builder.setMaxElapsedBackoffMillis(options.getMaxElaspedBackoffMillis()); + builder.setStreamingBufferSize(options.getStreamingBufferSize()); + builder.setStreamingBatchSize(options.getStreamingBatchSize()); + builder.setReadPartialRowTimeoutMillis(options.getReadPartialRowTimeoutMillis()); + builder.setMaxScanTimeoutRetries(options.getMaxScanTimeoutRetries()); + builder.setAllowRetriesWithoutTimestamp(options.allowRetriesWithoutTimestamp()); for (Status.Code code : Status.Code.values()) { - if (retryOptions.isRetryable(code)) { - retryOptionsBuilder.addStatusToRetryOn(code); + if (options.isRetryable(code)) { + builder.addStatusToRetryOn(code); } } - builder.setRetryOptions(retryOptionsBuilder.build()); return builder; } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9231826/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java index cd16f54..a6a7f9d 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java @@ -28,11 +28,9 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValu import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Verify.verifyNotNull; - import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.hasSize; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; @@ -71,8 +69,6 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.protobuf.ByteString; import com.google.protobuf.Empty; -import io.grpc.Status; - import org.hamcrest.Matchers; import org.junit.Before; import org.junit.Rule; @@ -86,12 +82,10 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; -import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; @@ -536,55 +530,64 @@ public class BigtableIOTest { } @Test - public void testAddBulkOptions() { + public void testReadWithBigTableOptionsSetsRetryOptions() { + final int initialBackoffMillis = -1; + BigtableOptions.Builder optionsBuilder = BIGTABLE_OPTIONS.toBuilder(); - optionsBuilder = BigtableIO.addBulkOptions(optionsBuilder); - BulkOptions bulkOptions = optionsBuilder.build().getBulkOptions(); - assertEquals(BulkOptions.BIGTABLE_ASYNC_MUTATOR_COUNT_DEFAULT, - bulkOptions.getAsyncMutatorCount()); - assertEquals(true, bulkOptions.useBulkApi()); - assertEquals(BulkOptions.BIGTABLE_BULK_MAX_ROW_KEY_COUNT_DEFAULT, - bulkOptions.getBulkMaxRowKeyCount()); - assertEquals(BulkOptions.BIGTABLE_BULK_MAX_REQUEST_SIZE_BYTES_DEFAULT, - bulkOptions.getBulkMaxRequestSize()); - assertEquals(BulkOptions.BIGTABLE_MAX_INFLIGHT_RPCS_PER_CHANNEL_DEFAULT - * optionsBuilder.getDataChannelCount(), bulkOptions.getMaxInflightRpcs()); - assertEquals(BulkOptions.BIGTABLE_MAX_MEMORY_DEFAULT, bulkOptions.getMaxMemory()); + RetryOptions.Builder retryOptionsBuilder = new RetryOptions.Builder(); + retryOptionsBuilder.setInitialBackoffMillis(initialBackoffMillis); + + optionsBuilder.setRetryOptions(retryOptionsBuilder.build()); + + BigtableIO.Read read = + BigtableIO.read().withBigtableOptions(optionsBuilder.build()); + + BigtableOptions options = read.getBigtableOptions(); + assertEquals(RetryOptions.DEFAULT_STREAMING_BATCH_SIZE, + options.getRetryOptions().getStreamingBatchSize()); + assertEquals(initialBackoffMillis, options.getRetryOptions().getInitialBackoffMillis()); + + assertThat(options.getRetryOptions(), + Matchers.equalTo(retryOptionsBuilder + .setStreamingBatchSize(RetryOptions.DEFAULT_STREAMING_BATCH_SIZE) + .build())); } @Test - public void testAddRetryOptions() { - final double delta = 0.0000001; + public void testWriteWithBigTableOptionsSetsBulkOptionsAndRetryOptions() { + final int maxInflightRpcs = 1; + final int initialBackoffMillis = -1; + BigtableOptions.Builder optionsBuilder = BIGTABLE_OPTIONS.toBuilder(); - optionsBuilder = BigtableIO.addRetryOptions(optionsBuilder); - - RetryOptions retryOptions = optionsBuilder.build().getRetryOptions(); - assertEquals(RetryOptions.DEFAULT_ENABLE_GRPC_RETRIES, retryOptions.enableRetries()); - assertEquals(RetryOptions.DEFAULT_INITIAL_BACKOFF_MILLIS, - retryOptions.getInitialBackoffMillis()); - assertEquals(RetryOptions.DEFAULT_BACKOFF_MULTIPLIER, retryOptions.getBackoffMultiplier(), - delta); - assertEquals(RetryOptions.DEFAULT_MAX_ELAPSED_BACKOFF_MILLIS, - retryOptions.getMaxElaspedBackoffMillis()); - assertEquals(RetryOptions.DEFAULT_STREAMING_BUFFER_SIZE, retryOptions.getStreamingBufferSize()); - assertEquals(RetryOptions.DEFAULT_STREAMING_BATCH_SIZE, retryOptions.getStreamingBatchSize()); - assertEquals(RetryOptions.DEFAULT_READ_PARTIAL_ROW_TIMEOUT_MS, - retryOptions.getReadPartialRowTimeoutMillis()); - assertEquals(RetryOptions.DEFAULT_MAX_SCAN_TIMEOUT_RETRIES, - retryOptions.getMaxScanTimeoutRetries()); - assertFalse(retryOptions.allowRetriesWithoutTimestamp()); - - Set<Status.Code> statusToRetryOn = new HashSet<>(); - for (Status.Code code : Status.Code.values()) { - if (retryOptions.isRetryable(code)) { - statusToRetryOn.add(code); - } - } - Set<Status.Code> defaultStatusToRetryOn = - new HashSet<>(RetryOptions.DEFAULT_ENABLE_GRPC_RETRIES_SET); - assertThat(statusToRetryOn, Matchers.containsInAnyOrder(defaultStatusToRetryOn.toArray())); + BulkOptions.Builder bulkOptionsBuilder = new BulkOptions.Builder(); + bulkOptionsBuilder.setMaxInflightRpcs(maxInflightRpcs); + + RetryOptions.Builder retryOptionsBuilder = new RetryOptions.Builder(); + retryOptionsBuilder.setInitialBackoffMillis(initialBackoffMillis); + + optionsBuilder.setBulkOptions(bulkOptionsBuilder.build()) + .setRetryOptions(retryOptionsBuilder.build()); + + BigtableIO.Write write = + BigtableIO.write().withBigtableOptions(optionsBuilder.build()); + + BigtableOptions options = write.getBigtableOptions(); + assertEquals(true, options.getBulkOptions().useBulkApi()); + assertEquals(maxInflightRpcs, options.getBulkOptions().getMaxInflightRpcs()); + assertEquals(RetryOptions.DEFAULT_STREAMING_BATCH_SIZE, + options.getRetryOptions().getStreamingBatchSize()); + assertEquals(initialBackoffMillis, options.getRetryOptions().getInitialBackoffMillis()); + + assertThat(options.getBulkOptions(), + Matchers.equalTo(bulkOptionsBuilder + .setUseBulkApi(true) + .build())); + assertThat(options.getRetryOptions(), + Matchers.equalTo(retryOptionsBuilder + .setStreamingBatchSize(RetryOptions.DEFAULT_STREAMING_BATCH_SIZE) + .build())); } ////////////////////////////////////////////////////////////////////////////////////////////
