This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new eba1ec36adc Bigtable: skip reading large rows (#37586)
eba1ec36adc is described below
commit eba1ec36adcac85d37ab7ea25986ebeec3dada47
Author: Mattie Fu <[email protected]>
AuthorDate: Fri Feb 13 11:51:31 2026 -0500
Bigtable: skip reading large rows (#37586)
* Bigtable: add experimental option to skip reading large rows in the
pipeline
* fix test
* fix test
---
.../io/gcp/bigtable/BigtableServiceFactory.java | 7 +++-
.../sdk/io/gcp/bigtable/BigtableServiceImpl.java | 31 ++++++++++++----
.../beam/sdk/io/gcp/bigtable/BigtableReadIT.java | 41 ++++++++++++++++++++++
.../io/gcp/bigtable/BigtableServiceImplTest.java | 3 +-
4 files changed, 74 insertions(+), 8 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceFactory.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceFactory.java
index 4c7805f6558..9933204d6f6 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceFactory.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceFactory.java
@@ -54,6 +54,8 @@ class BigtableServiceFactory implements Serializable {
private static final String BIGTABLE_ENABLE_CLIENT_SIDE_METRICS =
"bigtable_enable_client_side_metrics";
+ private static final String BIGTABLE_ENABLE_SKIP_LARGE_ROWS =
"bigtable_enable_skip_large_rows";
+
@AutoValue
abstract static class ConfigId implements Serializable {
@@ -133,7 +135,10 @@ class BigtableServiceFactory implements Serializable {
BigtableDataSettings.enableBuiltinMetrics();
}
- BigtableService service = new BigtableServiceImpl(settings);
+ boolean skipLargeRows =
+ ExperimentalOptions.hasExperiment(pipelineOptions,
BIGTABLE_ENABLE_SKIP_LARGE_ROWS);
+
+ BigtableService service = new BigtableServiceImpl(settings,
skipLargeRows);
entry = BigtableServiceEntry.create(configId, service);
entries.put(configId.id(), entry);
refCounts.put(configId.id(), new AtomicInteger(1));
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
index 3451bbf450c..f7aa50a7437 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
@@ -104,10 +104,17 @@ class BigtableServiceImpl implements BigtableService {
private static final double WATERMARK_PERCENTAGE = .1;
private static final long MIN_BYTE_BUFFER_SIZE = 100 * 1024 * 1024; // 100MB
+ private final boolean skipLargeRows;
+
BigtableServiceImpl(BigtableDataSettings settings) throws IOException {
+ this(settings, false);
+ }
+
+ BigtableServiceImpl(BigtableDataSettings settings, boolean skipLargeRows)
throws IOException {
this.projectId = settings.getProjectId();
this.instanceId = settings.getInstanceId();
this.client = BigtableDataClient.create(settings);
+ this.skipLargeRows = skipLargeRows;
LOG.info("Started Bigtable service with settings {}", settings);
}
@@ -142,6 +149,7 @@ class BigtableServiceImpl implements BigtableService {
private ServerStream<Row> stream;
private boolean exhausted;
+ private final boolean skipLargeRows;
@VisibleForTesting
BigtableReaderImpl(
@@ -150,13 +158,15 @@ class BigtableServiceImpl implements BigtableService {
String instanceId,
String tableId,
List<ByteKeyRange> ranges,
- @Nullable RowFilter rowFilter) {
+ @Nullable RowFilter rowFilter,
+ boolean skipLargeRows) {
this.client = client;
this.projectId = projectId;
this.instanceId = instanceId;
this.tableId = tableId;
this.ranges = ranges;
this.rowFilter = rowFilter;
+ this.skipLargeRows = skipLargeRows;
}
@Override
@@ -173,11 +183,19 @@ class BigtableServiceImpl implements BigtableService {
if (rowFilter != null) {
query.filter(Filters.FILTERS.fromProto(rowFilter));
}
+
try {
- stream =
- client
- .readRowsCallable(new BigtableRowProtoAdapter())
- .call(query, GrpcCallContext.createDefault());
+ if (skipLargeRows) {
+ stream =
+ client
+ .skipLargeRowsCallable(new BigtableRowProtoAdapter())
+ .call(query, GrpcCallContext.createDefault());
+ } else {
+ stream =
+ client
+ .readRowsCallable(new BigtableRowProtoAdapter())
+ .call(query, GrpcCallContext.createDefault());
+ }
results = stream.iterator();
serviceCallMetric.call("ok");
} catch (StatusRuntimeException e) {
@@ -667,7 +685,8 @@ class BigtableServiceImpl implements BigtableService {
instanceId,
source.getTableId().get(),
source.getRanges(),
- source.getRowFilter());
+ source.getRowFilter(),
+ skipLargeRows);
}
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
index 4ce9ad10b2c..1da38e6d083 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
@@ -27,6 +27,7 @@ import com.google.cloud.bigtable.config.BigtableOptions;
import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
+import com.google.cloud.bigtable.data.v2.models.TableId;
import java.io.IOException;
import java.util.Date;
import org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils;
@@ -34,6 +35,7 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.metrics.Lineage;
+import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -148,6 +150,45 @@ public class BigtableReadIT {
checkLineageSourceMetric(r, tableId);
}
+ @Test
+ public void testE2EBigtableReadWithSkippingLargeRows() {
+
tableAdminClient.createTable(CreateTableRequest.of(tableId).addFamily(COLUMN_FAMILY_NAME));
+
+ // Write a few rows first
+ int numRows = 20;
+ int numLargeRows = 3;
+ // Each mutation can't exceed 100 MB. Break it down to 3 columns
+ String value = StringUtils.repeat("v", 90 * 1000 * 1000);
+ for (int i = 0; i < numLargeRows; i++) {
+ for (int j = 0; j < 3; j++) {
+ client.mutateRow(
+ RowMutation.create(TableId.of(tableId), "large_row-" + i)
+ .setCell(COLUMN_FAMILY_NAME, "q" + i, value));
+ }
+ }
+
+ for (int i = 0; i < numRows - numLargeRows; i++) {
+ client.mutateRow(
+ RowMutation.create(TableId.of(tableId), "row-" + i)
+ .setCell(COLUMN_FAMILY_NAME, "q", "value"));
+ }
+
+ ExperimentalOptions.addExperiment(
+ options.as(ExperimentalOptions.class),
"bigtable_enable_skip_large_rows");
+
+ Pipeline p = Pipeline.create(options);
+ PCollection<Long> count =
+ p.apply(
+ BigtableIO.read()
+ .withProjectId(project)
+ .withInstanceId(options.getInstanceId())
+ .withTableId(tableId))
+ .apply(Count.globally());
+ PAssert.thatSingleton(count).isEqualTo((long) numRows - numLargeRows);
+ PipelineResult r = p.run();
+ checkLineageSourceMetric(r, tableId);
+ }
+
private void checkLineageSourceMetric(PipelineResult r, String tableId) {
// TODO(https://github.com/apache/beam/issues/32071) test malformed,
// when pipeline.run() is non-blocking, the metrics are not available by
the time of query
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java
index 0564493ca1a..37d7d89021d 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java
@@ -179,7 +179,8 @@ public class BigtableServiceImplTest {
bigtableDataSettings.getInstanceId(),
mockBigtableSource.getTableId().get(),
mockBigtableSource.getRanges(),
- null);
+ null,
+ false);
underTest.start();
Assert.assertEquals(expectedRow, underTest.getCurrentRow());