[ 
https://issues.apache.org/jira/browse/BEAM-3412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16336828#comment-16336828
 ] 

ASF GitHub Bot commented on BEAM-3412:
--------------------------------------

sduskis closed pull request #4462: [BEAM-3412] Upgrade Cloud Bigtable to 1.0.0
URL: https://github.com/apache/beam/pull/4462
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/build.gradle b/build.gradle
index e51ee28bd8e..10086b4824a 100644
--- a/build.gradle
+++ b/build.gradle
@@ -27,7 +27,7 @@ apply from: project(":").file("build_rules.gradle")
 // a dependency version which should match across multiple
 // Maven artifacts.
 def google_cloud_bigdataoss_version = "1.4.5"
-def bigtable_version = "1.0.0-pre3"
+def bigtable_version = "1.0.0"
 def google_clients_version = "1.22.0"
 def google_auth_version = "0.7.1"
 def grpc_version = "1.2.0"
diff --git a/pom.xml b/pom.xml
index fd65b2d9606..627551f68a8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -110,7 +110,7 @@
     <args4j.version>2.33</args4j.version>
     <avro.version>1.8.2</avro.version>
     <bigquery.version>v2-rev355-1.22.0</bigquery.version>
-    <bigtable.version>1.0.0-pre3</bigtable.version>
+    <bigtable.version>1.0.0</bigtable.version>
     <cloudresourcemanager.version>v1-rev6-1.22.0</cloudresourcemanager.version>
     <pubsubgrpc.version>0.1.18</pubsubgrpc.version>
     <clouddebugger.version>v2-rev8-1.22.0</clouddebugger.version>
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
index 06c459bb447..2206bdd86d4 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
@@ -18,8 +18,8 @@
 package org.apache.beam.sdk.io.gcp.bigtable;
 
 import com.google.bigtable.admin.v2.GetTableRequest;
-import com.google.bigtable.v2.MutateRowRequest;
 import com.google.bigtable.v2.MutateRowResponse;
+import com.google.bigtable.v2.MutateRowsRequest;
 import com.google.bigtable.v2.Mutation;
 import com.google.bigtable.v2.ReadRowsRequest;
 import com.google.bigtable.v2.Row;
@@ -30,9 +30,9 @@
 import com.google.cloud.bigtable.config.BigtableOptions;
 import com.google.cloud.bigtable.grpc.BigtableSession;
 import com.google.cloud.bigtable.grpc.BigtableTableName;
-import com.google.cloud.bigtable.grpc.async.AsyncExecutor;
 import com.google.cloud.bigtable.grpc.async.BulkMutation;
 import com.google.cloud.bigtable.grpc.scanner.ResultScanner;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import com.google.common.io.Closer;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -94,13 +94,15 @@ public boolean tableExists(String tableId) throws 
IOException {
     }
   }
 
-  private class BigtableReaderImpl implements Reader {
+  @VisibleForTesting
+  static class BigtableReaderImpl implements Reader {
     private BigtableSession session;
     private final BigtableSource source;
     private ResultScanner<Row> results;
     private Row currentRow;
 
-    public BigtableReaderImpl(BigtableSession session, BigtableSource source) {
+    @VisibleForTesting
+    BigtableReaderImpl(BigtableSession session, BigtableSource source) {
       this.session = session;
       this.source = source;
     }
@@ -119,7 +121,8 @@ public boolean start() throws IOException {
       ReadRowsRequest.Builder requestB =
           ReadRowsRequest.newBuilder()
               .setRows(rowSet)
-              
.setTableName(options.getInstanceName().toTableNameStr(source.getTableId()));
+              .setTableName(session.getOptions().getInstanceName()
+                  .toTableNameStr(source.getTableId()));
       if (source.getRowFilter() != null) {
         requestB.setFilter(source.getRowFilter());
       }
@@ -166,17 +169,14 @@ public Row getCurrentRow() throws NoSuchElementException {
     }
   }
 
-  private static class BigtableWriterImpl implements Writer {
+  @VisibleForTesting
+  static class BigtableWriterImpl implements Writer {
     private BigtableSession session;
-    private AsyncExecutor executor;
     private BulkMutation bulkMutation;
-    private final String tableName;
 
-    public BigtableWriterImpl(BigtableSession session, BigtableTableName 
tableName) {
+    BigtableWriterImpl(BigtableSession session, BigtableTableName tableName) {
       this.session = session;
-      executor = session.createAsyncExecutor();
-      bulkMutation = session.createBulkMutation(tableName, executor);
-      this.tableName = tableName.toString();
+      bulkMutation = session.createBulkMutation(tableName);
     }
 
     @Override
@@ -189,7 +189,6 @@ public void flush() throws IOException {
           // We fail since flush() operation was interrupted.
           throw new IOException(e);
         }
-        executor.flush();
       }
     }
 
@@ -205,8 +204,6 @@ public void close() throws IOException {
             throw new IOException(e);
           }
           bulkMutation = null;
-          executor.flush();
-          executor = null;
         }
       } finally {
         if (session != null) {
@@ -220,13 +217,17 @@ public void close() throws IOException {
     public ListenableFuture<MutateRowResponse> writeRecord(
         KV<ByteString, Iterable<Mutation>> record)
         throws IOException {
-      MutateRowRequest r =
-          MutateRowRequest.newBuilder()
-              .setTableName(tableName)
-              .setRowKey(record.getKey())
-              .addAllMutations(record.getValue())
-              .build();
-      return bulkMutation.add(r);
+      MutateRowsRequest.Entry.Builder requestBuilder = 
MutateRowsRequest.Entry.newBuilder();
+      requestBuilder.setRowKey(record.getKey());
+
+      // TODO: Ideally, this should use 
`requestBuilder.addAllMutations(record.getValue());`
+      //       Unfortunately, the version of protobuf used by Cloud Bigtable 
is not compatible
+      //       with the version used by beam. Specifically, the follow does 
not work:
+      //       `requestBuilder.addAllMutations(record.getValue());`.
+      for (Mutation mutation : record.getValue()) {
+        requestBuilder.addMutations(mutation);
+      }
+      return bulkMutation.add(requestBuilder.build());
     }
   }
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java
new file mode 100644
index 00000000000..2a193599014
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.bigtable.v2.MutateRowsRequest;
+import com.google.bigtable.v2.MutateRowsRequest.Entry;
+import com.google.bigtable.v2.Mutation;
+import com.google.bigtable.v2.Mutation.SetCell;
+import com.google.bigtable.v2.ReadRowsRequest;
+import com.google.bigtable.v2.Row;
+import com.google.cloud.bigtable.config.BigtableOptions;
+import com.google.cloud.bigtable.grpc.BigtableDataClient;
+import com.google.cloud.bigtable.grpc.BigtableInstanceName;
+import com.google.cloud.bigtable.grpc.BigtableSession;
+import com.google.cloud.bigtable.grpc.BigtableTableName;
+import com.google.cloud.bigtable.grpc.async.BulkMutation;
+import com.google.cloud.bigtable.grpc.scanner.ResultScanner;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource;
+import org.apache.beam.sdk.io.range.ByteKey;
+import org.apache.beam.sdk.io.range.ByteKeyRange;
+import org.apache.beam.sdk.values.KV;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Unit tests of BigtableServiceImpl.
+ */
+@RunWith(JUnit4.class)
+public class BigtableServiceImplTest {
+
+  private static final BigtableTableName TABLE_NAME =
+      new BigtableInstanceName("project", "instance").toTableName("table");
+
+  @Mock
+  private BigtableSession mockSession;
+
+  @Mock
+  private BulkMutation mockBulkMutation;
+
+  @Mock
+  private BigtableDataClient mockBigtableDataClient;
+
+  @Mock
+  private BigtableSource mockBigtableSource;
+
+  @Before
+  public void setup(){
+    MockitoAnnotations.initMocks(this);
+    BigtableOptions options = new BigtableOptions.Builder()
+        .setProjectId("project")
+        .setInstanceId("instance")
+        .build();
+    when(mockSession.getOptions()).thenReturn(options);
+    
when(mockSession.createBulkMutation(eq(TABLE_NAME))).thenReturn(mockBulkMutation);
+    when(mockSession.getDataClient()).thenReturn(mockBigtableDataClient);
+  }
+
+  /**
+   * This test ensures that protobuf creation and interactions with {@link 
BigtableDataClient} work
+   * as expected.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Test
+  public void testRead() throws IOException, InterruptedException {
+    ByteKey start = ByteKey.copyFrom("a".getBytes());
+    ByteKey end = ByteKey.copyFrom("b".getBytes());
+    
when(mockBigtableSource.getRanges()).thenReturn(Arrays.asList(ByteKeyRange.of(start,
 end)));
+    @SuppressWarnings("unchecked")
+    ResultScanner<Row> mockResultScanner = Mockito.mock(ResultScanner.class);
+    Row expectedRow = Row.newBuilder()
+        .setKey(ByteString.copyFromUtf8("a"))
+        .build();
+    when(mockResultScanner.next())
+        .thenReturn(expectedRow)
+        .thenReturn(null);
+    
when(mockBigtableDataClient.readRows(any(ReadRowsRequest.class))).thenReturn(mockResultScanner);
+    BigtableService.Reader underTest =
+        new BigtableServiceImpl.BigtableReaderImpl(mockSession, 
mockBigtableSource);
+
+    underTest.start();
+    Assert.assertEquals(expectedRow, underTest.getCurrentRow());
+    Assert.assertFalse(underTest.advance());
+    underTest.close();
+
+    verify(mockResultScanner, times(1)).close();
+  }
+
+  /**
+   * This test ensures that protobuf creation and interactions with {@link 
BulkMutation} work as
+   * expected.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Test
+  public void testWrite() throws IOException, InterruptedException {
+    BigtableService.Writer underTest =
+        new BigtableServiceImpl.BigtableWriterImpl(mockSession, TABLE_NAME);
+
+    Mutation mutation = Mutation.newBuilder()
+        
.setSetCell(SetCell.newBuilder().setFamilyName("Family").build()).build();
+    ByteString key = ByteString.copyFromUtf8("key");
+    underTest.writeRecord(KV.of(key, (Iterable<Mutation>) 
Arrays.asList(mutation)));
+    Entry expected = MutateRowsRequest.Entry.newBuilder()
+        .setRowKey(key)
+        .addMutations(mutation)
+        .build();
+    verify(mockBulkMutation, times(1)).add(expected);
+
+    underTest.close();
+    verify(mockBulkMutation, times(1)).flush();
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Update BigTable client version to 1.0
> -------------------------------------
>
>                 Key: BEAM-3412
>                 URL: https://issues.apache.org/jira/browse/BEAM-3412
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-gcp
>            Reporter: Chamikara Jayalath
>            Assignee: Solomon Duskis
>            Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to