Repository: incubator-beam Updated Branches: refs/heads/master 4a0e426a8 -> 38866cd55
Update to bigtable-client-core-0.3.0 and use bulk writes Generally more stable, plus bulk writes bring 5x write throughput in batch jobs by more efficiently using the network. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/653c504f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/653c504f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/653c504f Branch: refs/heads/master Commit: 653c504f2f9460bc8861d149694ed2595701ce16 Parents: 4a0e426 Author: Ian Zhou <[email protected]> Authored: Thu Jun 16 13:52:11 2016 -0700 Committer: Dan Halperin <[email protected]> Committed: Thu Jun 30 13:05:07 2016 -0700 ---------------------------------------------------------------------- pom.xml | 2 +- sdks/java/core/pom.xml | 2 +- sdks/java/io/google-cloud-platform/pom.xml | 2 +- .../beam/sdk/io/gcp/bigtable/BigtableIO.java | 60 +++++++++++++++++++- .../io/gcp/bigtable/BigtableServiceImpl.java | 31 +++++----- .../sdk/io/gcp/bigtable/BigtableIOTest.java | 59 +++++++++++++++++++ .../sdk/io/gcp/bigtable/BigtableWriteIT.java | 18 +++++- 7 files changed, 150 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/653c504f/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 6f1eaac..14a9c67 100644 --- a/pom.xml +++ b/pom.xml @@ -119,7 +119,7 @@ <google-cloud-bigdataoss.version>1.4.5</google-cloud-bigdataoss.version> <google-cloud-dataflow-java-proto-library-all.version>0.5.160304</google-cloud-dataflow-java-proto-library-all.version> <guava.version>19.0</guava.version> - <grpc.version>0.12.0</grpc.version> + <grpc.version>0.13.1</grpc.version> <hamcrest.version>1.3</hamcrest.version> <jackson.version>2.7.2</jackson.version> <findbugs.version>3.0.1</findbugs.version> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/653c504f/sdks/java/core/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml index 67c7fe9..9ec8f3d 100644 --- a/sdks/java/core/pom.xml +++ b/sdks/java/core/pom.xml @@ -311,7 +311,7 @@ <dependency> <groupId>io.netty</groupId> <artifactId>netty-handler</artifactId> - <version>4.1.0.Beta8</version> + <version>4.1.0.CR1</version> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/653c504f/sdks/java/io/google-cloud-platform/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml index c95ea71..c7e77f1 100644 --- a/sdks/java/io/google-cloud-platform/pom.xml +++ b/sdks/java/io/google-cloud-platform/pom.xml @@ -32,7 +32,7 @@ <packaging>jar</packaging> <properties> - <bigtable.version>0.2.3</bigtable.version> + <bigtable.version>0.3.0</bigtable.version> </properties> <build> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/653c504f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index cddb333..47c68dd 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -47,6 +47,8 @@ import com.google.bigtable.v1.Row; import com.google.bigtable.v1.RowFilter; import com.google.bigtable.v1.SampleRowKeysResponse; import com.google.cloud.bigtable.config.BigtableOptions; +import com.google.cloud.bigtable.config.BulkOptions; +import com.google.cloud.bigtable.config.RetryOptions; import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.FutureCallback; @@ -54,6 +56,8 @@ import com.google.common.util.concurrent.Futures; import com.google.protobuf.ByteString; import com.google.protobuf.Empty; +import io.grpc.Status; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -204,6 +208,8 @@ public class BigtableIO { checkNotNull(optionsBuilder, "optionsBuilder"); // TODO: is there a better way to clone a Builder? Want it to be immune from user changes. BigtableOptions.Builder clonedBuilder = optionsBuilder.build().toBuilder(); + clonedBuilder.setDataChannelCount(1); + clonedBuilder = addRetryOptions(clonedBuilder); BigtableOptions optionsWithAgent = clonedBuilder.setUserAgent(getUserAgent()).build(); return new Read(optionsWithAgent, tableId, filter, bigtableService); } @@ -388,6 +394,8 @@ public class BigtableIO { checkNotNull(optionsBuilder, "optionsBuilder"); // TODO: is there a better way to clone a Builder? Want it to be immune from user changes. BigtableOptions.Builder clonedBuilder = optionsBuilder.build().toBuilder(); + clonedBuilder = addBulkOptions(clonedBuilder); + clonedBuilder = addRetryOptions(clonedBuilder); BigtableOptions optionsWithAgent = clonedBuilder.setUserAgent(getUserAgent()).build(); return new Write(optionsWithAgent, tableId, bigtableService); } @@ -1024,6 +1032,56 @@ public class BigtableIO { info.getName(), info.getVersion(), javaVersion, - "0.2.3" /* TODO get Bigtable client version directly from jar. */); + "0.3.0" /* TODO get Bigtable client version directly from jar. */); + } + + /** + * A helper function to add appropriate bulk options. See + * <a href="https://github.com/GoogleCloudPlatform/cloud-bigtable-client/issues/899">RetryOptions + * toBuilder</a> for issue. + */ + static BigtableOptions.Builder addBulkOptions(BigtableOptions.Builder builder) { + BulkOptions bulkOptions = builder.build().getBulkOptions(); + + BulkOptions.Builder bulkOptionsBuilder = new BulkOptions.Builder() + .setAsyncMutatorWorkerCount(bulkOptions.getAsyncMutatorCount()) + .setUseBulkApi(true) + .setBulkMaxRowKeyCount(bulkOptions.getBulkMaxRowKeyCount()) + .setBulkMaxRequestSize(bulkOptions.getBulkMaxRequestSize()) + .setMaxInflightRpcs(bulkOptions.getMaxInflightRpcs()) + .setMaxMemory(bulkOptions.getMaxMemory()); + + builder.setBulkOptions(bulkOptionsBuilder.build()); + return builder; + } + + /** + * A helper function to add appropriate retry options. See + * <a href="https://github.com/GoogleCloudPlatform/cloud-bigtable-client/issues/899">RetryOptions + * toBuilder</a> for issue. + */ + static BigtableOptions.Builder addRetryOptions(BigtableOptions.Builder builder) { + RetryOptions retryOptions = builder.build().getRetryOptions(); + + RetryOptions.Builder retryOptionsBuilder = new RetryOptions.Builder() + .setEnableRetries(retryOptions.enableRetries()) + .setInitialBackoffMillis(retryOptions.getInitialBackoffMillis()) + .setBackoffMultiplier(retryOptions.getBackoffMultiplier()) + .setMaxElapsedBackoffMillis(retryOptions.getMaxElaspedBackoffMillis()) + .setStreamingBufferSize(retryOptions.getStreamingBufferSize()) + .setStreamingBatchSize(Math.min(retryOptions.getStreamingBatchSize(), + retryOptions.getStreamingBufferSize() / 2)) + .setReadPartialRowTimeoutMillis(retryOptions.getReadPartialRowTimeoutMillis()) + .setMaxScanTimeoutRetries(retryOptions.getMaxScanTimeoutRetries()) + .setAllowRetriesWithoutTimestamp(retryOptions.allowRetriesWithoutTimestamp()); + + for (Status.Code code : Status.Code.values()) { + if (retryOptions.isRetryable(code)) { + retryOptionsBuilder.addStatusToRetryOn(code); + } + } + + builder.setRetryOptions(retryOptionsBuilder.build()); + return builder; } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/653c504f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java index 5933e13..a0e6b29 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java @@ -30,8 +30,9 @@ import com.google.bigtable.v1.SampleRowKeysRequest; import com.google.bigtable.v1.SampleRowKeysResponse; import com.google.cloud.bigtable.config.BigtableOptions; import com.google.cloud.bigtable.grpc.BigtableSession; +import com.google.cloud.bigtable.grpc.BigtableTableName; import com.google.cloud.bigtable.grpc.async.AsyncExecutor; -import com.google.cloud.bigtable.grpc.async.HeapSizeManager; +import com.google.cloud.bigtable.grpc.async.BulkMutation; import com.google.cloud.bigtable.grpc.scanner.ResultScanner; import com.google.common.base.MoreObjects; import com.google.common.io.Closer; @@ -65,7 +66,7 @@ class BigtableServiceImpl implements BigtableService { @Override public BigtableWriterImpl openForWriting(String tableId) throws IOException { BigtableSession session = new BigtableSession(options); - String tableName = options.getClusterName().toTableNameStr(tableId); + BigtableTableName tableName = options.getClusterName().toTableName(tableId); return new BigtableWriterImpl(session, tableName); } @@ -170,24 +171,23 @@ class BigtableServiceImpl implements BigtableService { private static class BigtableWriterImpl implements Writer { private BigtableSession session; private AsyncExecutor executor; + private BulkMutation bulkMutation; private final MutateRowRequest.Builder partialBuilder; - public BigtableWriterImpl(BigtableSession session, String tableName) { + public BigtableWriterImpl(BigtableSession session, BigtableTableName tableName) { this.session = session; - this.executor = - new AsyncExecutor( - session.getDataClient(), - new HeapSizeManager( - AsyncExecutor.ASYNC_MUTATOR_MAX_MEMORY_DEFAULT, - AsyncExecutor.MAX_INFLIGHT_RPCS_DEFAULT)); - - partialBuilder = MutateRowRequest.newBuilder().setTableName(tableName); + executor = session.createAsyncExecutor(); + bulkMutation = session.createBulkMutation(tableName, executor); + + partialBuilder = MutateRowRequest.newBuilder().setTableName(tableName.toString()); } @Override public void close() throws IOException { try { - if (executor != null) { + if (bulkMutation != null) { + bulkMutation.flush(); + bulkMutation = null; executor.flush(); executor = null; } @@ -208,12 +208,7 @@ class BigtableServiceImpl implements BigtableService { .setRowKey(record.getKey()) .addAllMutations(record.getValue()) .build(); - try { - return executor.mutateRowAsync(r); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException("Write interrupted", e); - } + return bulkMutation.add(r); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/653c504f/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java index cdbaaac..6a6197e 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java @@ -27,6 +27,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Verify.verifyNotNull; import static org.hamcrest.Matchers.hasSize; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; @@ -54,6 +55,8 @@ import com.google.bigtable.v1.Row; import com.google.bigtable.v1.RowFilter; import com.google.bigtable.v1.SampleRowKeysResponse; import com.google.cloud.bigtable.config.BigtableOptions; +import com.google.cloud.bigtable.config.BulkOptions; +import com.google.cloud.bigtable.config.RetryOptions; import com.google.common.base.Predicate; import com.google.common.base.Predicates; import com.google.common.collect.ImmutableList; @@ -63,6 +66,8 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.protobuf.ByteString; import com.google.protobuf.Empty; +import io.grpc.Status; + import org.hamcrest.Matchers; import org.junit.Before; import org.junit.Rule; @@ -76,10 +81,12 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; @@ -520,6 +527,58 @@ public class BigtableIOTest { reader.close(); } + @Test + public void testAddBulkOptions() { + BigtableOptions.Builder optionsBuilder = BIGTABLE_OPTIONS.toBuilder(); + optionsBuilder = BigtableIO.addBulkOptions(optionsBuilder); + + BulkOptions bulkOptions = optionsBuilder.build().getBulkOptions(); + assertEquals(BulkOptions.BIGTABLE_ASYNC_MUTATOR_COUNT_DEFAULT, + bulkOptions.getAsyncMutatorCount()); + assertEquals(true, bulkOptions.useBulkApi()); + assertEquals(BulkOptions.BIGTABLE_BULK_MAX_ROW_KEY_COUNT_DEFAULT, + bulkOptions.getBulkMaxRowKeyCount()); + assertEquals(BulkOptions.BIGTABLE_BULK_MAX_REQUEST_SIZE_BYTES_DEFAULT, + bulkOptions.getBulkMaxRequestSize()); + assertEquals(BulkOptions.BIGTABLE_MAX_INFLIGHT_RPCS_PER_CHANNEL_DEFAULT + * optionsBuilder.getDataChannelCount(), bulkOptions.getMaxInflightRpcs()); + assertEquals(BulkOptions.BIGTABLE_MAX_MEMORY_DEFAULT, bulkOptions.getMaxMemory()); + } + + @Test + public void testAddRetryOptions() { + final double delta = 0.0000001; + BigtableOptions.Builder optionsBuilder = BIGTABLE_OPTIONS.toBuilder(); + optionsBuilder = BigtableIO.addRetryOptions(optionsBuilder); + + RetryOptions retryOptions = optionsBuilder.build().getRetryOptions(); + assertEquals(RetryOptions.DEFAULT_ENABLE_GRPC_RETRIES, retryOptions.enableRetries()); + assertEquals(RetryOptions.DEFAULT_INITIAL_BACKOFF_MILLIS, + retryOptions.getInitialBackoffMillis()); + assertEquals(RetryOptions.DEFAULT_BACKOFF_MULTIPLIER, retryOptions.getBackoffMultiplier(), + delta); + assertEquals(RetryOptions.DEFAULT_MAX_ELAPSED_BACKOFF_MILLIS, + retryOptions.getMaxElaspedBackoffMillis()); + assertEquals(RetryOptions.DEFAULT_STREAMING_BUFFER_SIZE, retryOptions.getStreamingBufferSize()); + assertEquals(RetryOptions.DEFAULT_STREAMING_BATCH_SIZE, retryOptions.getStreamingBatchSize()); + assertEquals(RetryOptions.DEFAULT_READ_PARTIAL_ROW_TIMEOUT_MS, + retryOptions.getReadPartialRowTimeoutMillis()); + assertEquals(RetryOptions.DEFAULT_MAX_SCAN_TIMEOUT_RETRIES, + retryOptions.getMaxScanTimeoutRetries()); + assertFalse(retryOptions.allowRetriesWithoutTimestamp()); + + Set<Status.Code> statusToRetryOn = new HashSet<>(); + for (Status.Code code : Status.Code.values()) { + if (retryOptions.isRetryable(code)) { + statusToRetryOn.add(code); + } + } + + Set<Status.Code> defaultStatusToRetryOn = + new HashSet<>(RetryOptions.DEFAULT_ENABLE_GRPC_RETRIES_SET); + assertThat(statusToRetryOn, Matchers.containsInAnyOrder(defaultStatusToRetryOn.toArray())); + } + //////////////////////////////////////////////////////////////////////////////////////////// private static final String COLUMN_FAMILY_NAME = "family"; private static final ByteString COLUMN_NAME = ByteString.copyFromUtf8("column"); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/653c504f/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java index af7afc5..8e17761 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java @@ -35,7 +35,9 @@ import com.google.bigtable.admin.table.v1.Table; import com.google.bigtable.v1.Mutation; import com.google.bigtable.v1.ReadRowsRequest; import com.google.bigtable.v1.Row; +import com.google.bigtable.v1.RowRange; import com.google.cloud.bigtable.config.BigtableOptions; +import com.google.cloud.bigtable.config.RetryOptions; import com.google.cloud.bigtable.grpc.BigtableSession; import com.google.cloud.bigtable.grpc.BigtableTableAdminClient; import com.google.cloud.bigtable.grpc.scanner.ResultScanner; @@ -78,11 +80,17 @@ public class BigtableWriteIT implements Serializable { PipelineOptionsFactory.register(BigtableTestOptions.class); options = TestPipeline.testingPipelineOptions().as(BigtableTestOptions.class); + // RetryOptions streamingBatchSize must be explicitly set for getTableData() + RetryOptions.Builder retryOptionsBuilder = new RetryOptions.Builder(); + retryOptionsBuilder.setStreamingBatchSize( + retryOptionsBuilder.build().getStreamingBufferSize() / 2); + BigtableOptions.Builder bigtableOptionsBuilder = new BigtableOptions.Builder() .setProjectId(options.getProjectId()) .setClusterId(options.getClusterId()) .setZoneId(options.getZoneId()) - .setUserAgent("apache-beam-test"); + .setUserAgent("apache-beam-test") + .setRetryOptions(retryOptionsBuilder.build()); bigtableOptions = bigtableOptionsBuilder.build(); session = new BigtableSession(bigtableOptions); @@ -172,9 +180,15 @@ public class BigtableWriteIT implements Serializable { /** Helper function to get a table's data. */ private List<KV<ByteString, ByteString>> getTableData(String tableName) throws IOException { + // Add empty range to avoid TARGET_NOT_SET error + RowRange range = RowRange.newBuilder() + .setStartKey(ByteString.EMPTY) + .setEndKey(ByteString.EMPTY) + .build(); List<KV<ByteString, ByteString>> tableData = new ArrayList<>(); ReadRowsRequest.Builder readRowsRequestBuilder = ReadRowsRequest.newBuilder() - .setTableName(tableName); + .setTableName(tableName) + .setRowRange(range); ResultScanner<Row> scanner = session.getDataClient().readRows(readRowsRequestBuilder.build()); Row currentRow;
