This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 9fcd0c91844 Add Cloud Bigtable Change Stream integration tests (#29127)
9fcd0c91844 is described below

commit 9fcd0c9184479e5d3211d82747fd4f56e708cbe5
Author: Tony Tang <tonycant...@gmail.com>
AuthorDate: Tue Oct 31 10:45:30 2023 -0400

    Add Cloud Bigtable Change Stream integration tests (#29127)
    
    * Add Cloud Bigtable Change Stream integration tests
    
    Change-Id: I68a877d5686f1898686b18491c6b4aff5e699862
    
    * Add default value to bigtable environment endpoint
    
    Change-Id: I490fca7ba2f24b15faa288d6f2c3f209db59f948
    
    * Move Bigtable options to main and register it automatically so we can set 
instanceId
    
    Change-Id: I4d5e5347dbba09a0e1ee170b8aa911d2f0a772ef
    
    * Remove bigtableProject from BigtableTestOptions. There was no way to set 
it explicitly prior to this
    
    Change-Id: I5ee3c663d3120ee85970ae5f24a962b9535323b3
    
    * Add comment explaining why we build the test pipeline in a different 
package
    
    Change-Id: I228dc61ca0b27131cd38a3ab24f136d0f924d9f7
    
    * Change instanceID to bigtableInstanceId to clarify the value we're 
specifying
    
    Change-Id: Ic66c4c061ed2f5979f6a530905e3cbbddd238f18
    
    * Change BigtableChangeStreamTestOptions to use more specific field names 
to avoid conflicts
    
    Change-Id: I489850e07812058e8c7ebb3c9878eae9d4bc9f06
---
 sdks/java/io/google-cloud-platform/build.gradle    |   2 +
 .../BigtableChangeStreamTestOptions.java           |  30 ++
 .../gcp/common/GcpIoPipelineOptionsRegistrar.java  |   2 +
 .../sdk/io/gcp/bigtable/BigtableTestUtils.java     |  27 +-
 .../changestreams/it/BigtableChangeStreamIT.java   | 361 +++++++++++++++++++++
 .../it/BigtableClientIntegrationTestOverride.java  |  83 +++++
 6 files changed, 504 insertions(+), 1 deletion(-)

diff --git a/sdks/java/io/google-cloud-platform/build.gradle 
b/sdks/java/io/google-cloud-platform/build.gradle
index d66122e4d10..b0122035a01 100644
--- a/sdks/java/io/google-cloud-platform/build.gradle
+++ b/sdks/java/io/google-cloud-platform/build.gradle
@@ -187,12 +187,14 @@ task integrationTest(type: Test, dependsOn: 
processTestResources) {
   def gcpTempRoot = project.findProperty('gcpTempRoot') ?: 
'gs://temp-storage-for-end-to-end-tests'
   def firestoreDb = project.findProperty('firestoreDb') ?: 'firestoredb'
   def firestoreHost = project.findProperty('firestoreHost') ?: 
'batch-firestore.googleapis.com:443'
+  def bigtableChangeStreamInstanceId = 
project.findProperty('bigtableChangeStreamInstanceId') ?: 'beam-test'
   systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
           "--runner=DirectRunner",
           "--project=${gcpProject}",
           "--tempRoot=${gcpTempRoot}",
           "--firestoreDb=${firestoreDb}",
           "--firestoreHost=${firestoreHost}",
+          "--bigtableChangeStreamInstanceId=${bigtableChangeStreamInstanceId}",
   ])
 
   // Disable Gradle cache: these ITs interact with live service that should 
always be considered "out of date"
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/BigtableChangeStreamTestOptions.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/BigtableChangeStreamTestOptions.java
new file mode 100644
index 00000000000..71303a0e84a
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/BigtableChangeStreamTestOptions.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable.changestreams;
+
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+
+public interface BigtableChangeStreamTestOptions extends TestPipelineOptions {
+  @Description("Instance ID for Bigtable Change Stream")
+  @Default.String("beam-test")
+  String getBigtableChangeStreamInstanceId();
+
+  void setBigtableChangeStreamInstanceId(String value);
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/common/GcpIoPipelineOptionsRegistrar.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/common/GcpIoPipelineOptionsRegistrar.java
index f1ff827fc63..6cfc03c9eaa 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/common/GcpIoPipelineOptionsRegistrar.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/common/GcpIoPipelineOptionsRegistrar.java
@@ -21,6 +21,7 @@ import com.google.auto.service.AutoService;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
 import org.apache.beam.sdk.io.gcp.bigquery.TestBigQueryOptions;
+import 
org.apache.beam.sdk.io.gcp.bigtable.changestreams.BigtableChangeStreamTestOptions;
 import org.apache.beam.sdk.io.gcp.firestore.FirestoreOptions;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -38,6 +39,7 @@ public class GcpIoPipelineOptionsRegistrar implements 
PipelineOptionsRegistrar {
         .add(PubsubOptions.class)
         .add(FirestoreOptions.class)
         .add(TestBigQueryOptions.class)
+        .add(BigtableChangeStreamTestOptions.class)
         .build();
   }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableTestUtils.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableTestUtils.java
index c35b7c54c4d..6bd2f3b25b3 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableTestUtils.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableTestUtils.java
@@ -31,12 +31,14 @@ import com.google.bigtable.v2.Family;
 import com.google.bigtable.v2.Mutation;
 import com.google.protobuf.ByteString;
 import java.util.List;
+import 
org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.BigtableClientOverride;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.values.KV;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Longs;
+import org.joda.time.Instant;
 
-class BigtableTestUtils {
+public class BigtableTestUtils {
 
   static final String BOOL_COLUMN = "boolColumn";
   static final String LONG_COLUMN = "longColumn";
@@ -144,4 +146,27 @@ class BigtableTestUtils {
     }
     return builder.build();
   }
+
+  // We have to build the pipeline at this package level and not changestreams 
package because
+  // endTime is package private and we can only create a pipeline with endTime 
here. Setting endTime
+  // allows the tests to predictably terminate.
+  public static BigtableIO.ReadChangeStream buildTestPipelineInput(
+      String projectId,
+      String instanceId,
+      String tableId,
+      String appProfileId,
+      String metadataTableName,
+      Instant startTime,
+      Instant endTime,
+      BigtableClientOverride clientOverride) {
+    return BigtableIO.readChangeStream()
+        .withProjectId(projectId)
+        .withInstanceId(instanceId)
+        .withTableId(tableId)
+        .withAppProfileId(appProfileId)
+        .withMetadataTableTableId(metadataTableName)
+        .withStartTime(startTime)
+        .withEndTime(endTime)
+        .withBigtableClientOverride(clientOverride);
+  }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/it/BigtableChangeStreamIT.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/it/BigtableChangeStreamIT.java
new file mode 100644
index 00000000000..e6455cbfd58
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/it/BigtableChangeStreamIT.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable.changestreams.it;
+
+import com.google.api.gax.batching.Batcher;
+import com.google.bigtable.v2.MutateRowsRequest;
+import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient;
+import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings;
+import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest;
+import com.google.cloud.bigtable.admin.v2.models.UpdateTableRequest;
+import com.google.cloud.bigtable.data.v2.BigtableDataClient;
+import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
+import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation;
+import com.google.cloud.bigtable.data.v2.models.Range;
+import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
+import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.common.IOITHelper;
+import org.apache.beam.sdk.io.gcp.bigtable.BigtableTestUtils;
+import 
org.apache.beam.sdk.io.gcp.bigtable.changestreams.BigtableChangeStreamTestOptions;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** End-to-end tests of Bigtable Change Stream. */
+@SuppressWarnings("FutureReturnValueIgnored")
+@RunWith(JUnit4.class)
+public class BigtableChangeStreamIT {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BigtableChangeStreamIT.class);
+  private static final String COLUMN_FAMILY1 = "CF";
+  private static final String COLUMN_FAMILY2 = "CF2";
+  private static final String COLUMN_QUALIFIER = "CQ";
+  private static String projectId;
+  private static String instanceId;
+  private static String tableId;
+  private static String appProfileId;
+  private static String metadataTableId;
+  private static BigtableTableAdminClient adminClient;
+  private static BigtableDataClient dataClient;
+  private static BigtableClientIntegrationTestOverride bigtableClientOverride;
+  private static Batcher<RowMutationEntry, Void> mutationBatcher;
+  private static BigtableChangeStreamTestOptions options;
+  private transient TestPipeline pipeline;
+
+  @BeforeClass
+  public static void beforeClass() throws IOException {
+    options = 
IOITHelper.readIOTestPipelineOptions(BigtableChangeStreamTestOptions.class);
+    LOG.info("Pipeline options: {}", options);
+    projectId = options.as(GcpOptions.class).getProject();
+    instanceId = options.getBigtableChangeStreamInstanceId();
+
+    long randomId = Instant.now().getMillis();
+    tableId = "beam-change-stream-test-" + randomId;
+    metadataTableId = "beam-change-stream-test-md-" + randomId;
+    appProfileId = "default";
+
+    bigtableClientOverride = new BigtableClientIntegrationTestOverride();
+    LOG.info(bigtableClientOverride.toString());
+
+    BigtableDataSettings.Builder dataSettingsBuilder = 
BigtableDataSettings.newBuilder();
+    BigtableTableAdminSettings.Builder tableAdminSettingsBuilder =
+        BigtableTableAdminSettings.newBuilder();
+    dataSettingsBuilder.setProjectId(projectId);
+    tableAdminSettingsBuilder.setProjectId(projectId);
+    dataSettingsBuilder.setInstanceId(instanceId);
+    tableAdminSettingsBuilder.setInstanceId(instanceId);
+    dataSettingsBuilder.setAppProfileId(appProfileId);
+    // TODO: Remove this later. But for now, disable direct path.
+    dataSettingsBuilder
+        .stubSettings()
+        .setTransportChannelProvider(
+            EnhancedBigtableStubSettings.defaultGrpcTransportProviderBuilder()
+                .setAttemptDirectPath(false)
+                .build());
+
+    bigtableClientOverride.updateDataClientSettings(dataSettingsBuilder);
+    
bigtableClientOverride.updateTableAdminClientSettings(tableAdminSettingsBuilder);
+
+    // These clients are used to modify the table and write to it
+    dataClient = BigtableDataClient.create(dataSettingsBuilder.build());
+    adminClient = 
BigtableTableAdminClient.create(tableAdminSettingsBuilder.build());
+
+    // Create change stream enabled table
+    adminClient.createTable(
+        CreateTableRequest.of(tableId)
+            .addChangeStreamRetention(org.threeten.bp.Duration.ofDays(1))
+            .addFamily(COLUMN_FAMILY1)
+            .addFamily(COLUMN_FAMILY2));
+
+    mutationBatcher = dataClient.newBulkMutationBatcher(tableId);
+  }
+
+  @Before
+  public void before() {
+    pipeline = 
TestPipeline.fromOptions(options).enableAbandonedNodeEnforcement(false);
+  }
+
+  @AfterClass
+  public static void afterClass() {
+    if (adminClient != null) {
+      if (adminClient.exists(tableId)) {
+        
adminClient.updateTable(UpdateTableRequest.of(tableId).disableChangeStreamRetention());
+        adminClient.deleteTable(tableId);
+        adminClient.deleteTable(metadataTableId);
+      }
+      adminClient.close();
+    }
+    if (dataClient != null) {
+      dataClient.close();
+    }
+  }
+
+  @Test
+  public void testReadBigtableChangeStream() throws InterruptedException {
+    Instant startTime = Instant.now();
+    String rowKey = "rowKeySetCell";
+    RowMutationEntry setCellEntry =
+        RowMutationEntry.create(rowKey).setCell(COLUMN_FAMILY1, 
COLUMN_QUALIFIER, "cell value 1");
+    mutationBatcher.add(setCellEntry);
+    mutationBatcher.flush();
+    Instant endTime = Instant.now().plus(Duration.standardSeconds(1));
+
+    PCollection<MutateRowsRequest.Entry> changeStream = 
buildPipeline(startTime, endTime);
+    PAssert.that(changeStream).containsInAnyOrder(setCellEntry.toProto());
+    pipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testDeleteRow() throws InterruptedException {
+    Instant startTime = Instant.now();
+    String rowKeyToDelete = "rowKeyToDelete";
+    RowMutationEntry setCellMutationToDelete =
+        RowMutationEntry.create(rowKeyToDelete)
+            .setCell(COLUMN_FAMILY1, COLUMN_QUALIFIER, "cell value 1");
+    RowMutationEntry deleteRowMutation = 
RowMutationEntry.create(rowKeyToDelete).deleteRow();
+    mutationBatcher.add(setCellMutationToDelete);
+    mutationBatcher.flush();
+    mutationBatcher.add(deleteRowMutation);
+    mutationBatcher.flush();
+    Instant endTime = Instant.now().plus(Duration.standardSeconds(1));
+
+    PCollection<MutateRowsRequest.Entry> changeStream = 
buildPipeline(startTime, endTime);
+    PAssert.that(changeStream)
+        .containsInAnyOrder(
+            setCellMutationToDelete.toProto(),
+            // Delete row becomes one deleteFamily per family
+            RowMutationEntry.create(rowKeyToDelete)
+                .deleteFamily(COLUMN_FAMILY1)
+                .deleteFamily(COLUMN_FAMILY2)
+                .toProto());
+    pipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testDeleteColumnFamily() throws InterruptedException {
+    Instant startTime = Instant.now();
+    String cellValue = "cell value 1";
+    String rowKeyMultiFamily = "rowKeyMultiFamily";
+    RowMutationEntry setCells =
+        RowMutationEntry.create(rowKeyMultiFamily)
+            .setCell(COLUMN_FAMILY1, COLUMN_QUALIFIER, cellValue)
+            .setCell(COLUMN_FAMILY2, COLUMN_QUALIFIER, cellValue);
+    mutationBatcher.add(setCells);
+    mutationBatcher.flush();
+    RowMutationEntry deleteCF2 =
+        
RowMutationEntry.create(rowKeyMultiFamily).deleteFamily(COLUMN_FAMILY2);
+    mutationBatcher.add(deleteCF2);
+    mutationBatcher.flush();
+    Instant endTime = Instant.now().plus(Duration.standardSeconds(1));
+
+    PCollection<MutateRowsRequest.Entry> changeStream = 
buildPipeline(startTime, endTime);
+    PAssert.that(changeStream).containsInAnyOrder(setCells.toProto(), 
deleteCF2.toProto());
+    pipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testDeleteCell() throws InterruptedException {
+    Instant startTime = Instant.now();
+    String cellValue = "cell value 1";
+    String rowKeyMultiCell = "rowKeyMultiCell";
+    RowMutationEntry setCells =
+        RowMutationEntry.create(rowKeyMultiCell)
+            .setCell(COLUMN_FAMILY1, COLUMN_QUALIFIER, cellValue)
+            .setCell(COLUMN_FAMILY1, "CQ2", cellValue);
+    mutationBatcher.add(setCells);
+    mutationBatcher.flush();
+    RowMutationEntry deleteCQ2 =
+        RowMutationEntry.create(rowKeyMultiCell)
+            // need to set timestamp range to make change stream output match
+            .deleteCells(
+                COLUMN_FAMILY1,
+                ByteString.copyFromUtf8("CQ2"),
+                Range.TimestampRange.create(
+                    startTime.getMillis() * 1000,
+                    startTime.plus(Duration.standardMinutes(2)).getMillis() * 
1000));
+    mutationBatcher.add(deleteCQ2);
+    mutationBatcher.flush();
+    Instant endTime = Instant.now().plus(Duration.standardSeconds(1));
+
+    PCollection<MutateRowsRequest.Entry> changeStream = 
buildPipeline(startTime, endTime);
+    PAssert.that(changeStream).containsInAnyOrder(setCells.toProto(), 
deleteCQ2.toProto());
+    pipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testComplexMutation() throws InterruptedException {
+    Instant startTime = Instant.now();
+    String rowKey = "rowKeyComplex";
+    // We'll delete this in the next mutation
+    RowMutationEntry setCell =
+        RowMutationEntry.create(rowKey).setCell(COLUMN_FAMILY1, 
COLUMN_QUALIFIER, "cell value 1");
+    mutationBatcher.add(setCell);
+    mutationBatcher.flush();
+    RowMutationEntry complexMutation =
+        RowMutationEntry.create(rowKey)
+            .setCell(COLUMN_FAMILY1, "CQ2", "cell value 2")
+            .setCell(COLUMN_FAMILY1, "CQ3", "cell value 3")
+            // need to set timestamp range to make change stream output match
+            .deleteCells(
+                COLUMN_FAMILY1,
+                ByteString.copyFromUtf8(COLUMN_QUALIFIER),
+                Range.TimestampRange.create(
+                    startTime.getMillis() * 1000,
+                    startTime.plus(Duration.standardMinutes(2)).getMillis() * 
1000));
+    mutationBatcher.add(complexMutation);
+    mutationBatcher.flush();
+    Instant endTime = Instant.now().plus(Duration.standardSeconds(1));
+
+    PCollection<MutateRowsRequest.Entry> changeStream = 
buildPipeline(startTime, endTime);
+    PAssert.that(changeStream).containsInAnyOrder(setCell.toProto(), 
complexMutation.toProto());
+    pipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testLargeMutation() throws InterruptedException {
+    Instant startTime = Instant.now();
+    // test set cell w size > 1MB so it triggers chunking
+    char[] chars = new char[1024 * 1500];
+    Arrays.fill(chars, '\u200B'); // zero-width space
+    String largeString = String.valueOf(chars);
+    String rowKeyLargeCell = "rowKeyLargeCell";
+    RowMutationEntry setLargeCell =
+        RowMutationEntry.create(rowKeyLargeCell)
+            .setCell(COLUMN_FAMILY1, COLUMN_QUALIFIER, largeString);
+    mutationBatcher.add(setLargeCell);
+    mutationBatcher.flush();
+    Instant endTime = Instant.now().plus(Duration.standardSeconds(1));
+
+    PCollection<MutateRowsRequest.Entry> changeStream = 
buildPipeline(startTime, endTime);
+    PAssert.that(changeStream).containsInAnyOrder(setLargeCell.toProto());
+    pipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testManyMutations() throws InterruptedException {
+    Instant startTime = Instant.now();
+    // test set cell w size > 1MB so it triggers chunking
+    char[] chars = new char[1024 * 3];
+    Arrays.fill(chars, '\u200B'); // zero-width space
+    String largeString = String.valueOf(chars);
+
+    ImmutableList.Builder<RowMutationEntry> originalWrites = 
ImmutableList.builder();
+    for (int i = 0; i < 100; ++i) {
+      String rowKey = "rowKey" + i;
+      // SetCell.
+      RowMutationEntry setLargeCell =
+          RowMutationEntry.create(rowKey).setCell(COLUMN_FAMILY1, 
COLUMN_QUALIFIER, largeString);
+      // DeleteFamily.
+      RowMutationEntry deleteFamily = 
RowMutationEntry.create(rowKey).deleteFamily(COLUMN_FAMILY1);
+      // DeleteCells.
+      RowMutationEntry deleteCells =
+          RowMutationEntry.create(rowKey)
+              // need to set timestamp range to make change stream output match
+              .deleteCells(
+                  COLUMN_FAMILY1,
+                  ByteString.copyFromUtf8(COLUMN_QUALIFIER),
+                  Range.TimestampRange.create(
+                      startTime.getMillis() * 1000,
+                      startTime.plus(Duration.standardMinutes(2)).getMillis() 
* 1000));
+      // Apply the mutations.
+      originalWrites.add(setLargeCell);
+      mutationBatcher.add(setLargeCell);
+      mutationBatcher.flush();
+
+      originalWrites.add(deleteFamily);
+      mutationBatcher.add(deleteFamily);
+      mutationBatcher.flush();
+
+      originalWrites.add(deleteCells);
+      mutationBatcher.add(deleteCells);
+      mutationBatcher.flush();
+    }
+    Instant endTime = Instant.now().plus(Duration.standardSeconds(1));
+
+    PCollection<MutateRowsRequest.Entry> changeStream = 
buildPipeline(startTime, endTime);
+    PAssert.that(changeStream)
+        .containsInAnyOrder(
+            originalWrites.build().stream()
+                .map(RowMutationEntry::toProto)
+                .collect(Collectors.toList()));
+    pipeline.run().waitUntilFinish();
+  }
+
+  private PCollection<MutateRowsRequest.Entry> buildPipeline(Instant 
startTime, Instant endTime) {
+    return pipeline
+        .apply(
+            BigtableTestUtils.buildTestPipelineInput(
+                projectId,
+                instanceId,
+                tableId,
+                appProfileId,
+                metadataTableId,
+                startTime,
+                endTime,
+                bigtableClientOverride))
+        .apply(ParDo.of(new ConvertToEntry()));
+  }
+
+  private static class ConvertToEntry
+      extends DoFn<KV<ByteString, ChangeStreamMutation>, 
MutateRowsRequest.Entry> {
+    @ProcessElement
+    public void processElement(
+        @Element KV<ByteString, ChangeStreamMutation> element,
+        OutputReceiver<MutateRowsRequest.Entry> out) {
+      out.output(element.getValue().toRowMutationEntry().toProto());
+    }
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/it/BigtableClientIntegrationTestOverride.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/it/BigtableClientIntegrationTestOverride.java
new file mode 100644
index 00000000000..0d6766aa20d
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/it/BigtableClientIntegrationTestOverride.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable.changestreams.it;
+
+import com.google.cloud.bigtable.admin.v2.BigtableInstanceAdminSettings;
+import com.google.cloud.bigtable.admin.v2.BigtableTableAdminSettings;
+import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
+import com.google.errorprone.annotations.CheckReturnValue;
+import java.io.Serializable;
+import 
org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.BigtableClientOverride;
+
+/** Implements BigtableClientOverride to override data and admin endpoints. */
+@CheckReturnValue
+final class BigtableClientIntegrationTestOverride implements Serializable, 
BigtableClientOverride {
+  private static final long serialVersionUID = 4188505491566837311L;
+
+  // The address of the admin API endpoint.
+  private static final String ADMIN_ENDPOINT_ENV_VAR =
+      getenv("BIGTABLE_ENV_ADMIN_ENDPOINT", 
"bigtableadmin.googleapis.com:443");
+  // The address of the data API endpoint.
+  private static final String DATA_ENDPOINT_ENV_VAR =
+      getenv("BIGTABLE_ENV_DATA_ENDPOINT", "bigtable.googleapis.com:443");
+
+  private final String adminEndpoint;
+  private final String dataEndpoint;
+
+  @Override
+  public String toString() {
+    return "BigtableClientIntegrationTestOverride{"
+        + "adminEndpoint="
+        + adminEndpoint
+        + ", dataEndpoint="
+        + dataEndpoint
+        + "}";
+  }
+
+  /** Applies the test environment settings to the builder. */
+  @Override
+  public void 
updateInstanceAdminClientSettings(BigtableInstanceAdminSettings.Builder 
builder) {
+    builder.stubSettings().setEndpoint(adminEndpoint);
+  }
+
+  /** Applies the test environment settings to the builder. */
+  @Override
+  public void 
updateTableAdminClientSettings(BigtableTableAdminSettings.Builder builder) {
+    builder.stubSettings().setEndpoint(adminEndpoint);
+  }
+
+  /** Applies the test environment settings to the builder. */
+  @Override
+  public void updateDataClientSettings(BigtableDataSettings.Builder builder) {
+    builder.stubSettings().setEndpoint(dataEndpoint);
+  }
+
+  /** Returns the value of the environment variable, or default string if not 
found. */
+  private static String getenv(String name, String defaultValue) {
+    final String value = System.getenv(name);
+    if (value != null) {
+      return value;
+    }
+    return defaultValue;
+  }
+
+  BigtableClientIntegrationTestOverride() {
+    adminEndpoint = ADMIN_ENDPOINT_ENV_VAR;
+    dataEndpoint = DATA_ENDPOINT_ENV_VAR;
+  }
+}

Reply via email to