justinuang commented on code in PR #34245:
URL: https://github.com/apache/beam/pull/34245#discussion_r1989538930
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -125,6 +125,100 @@ public BigtableWriterImpl
openForWriting(BigtableWriteOptions writeOptions) {
writeOptions.getCloseWaitTimeout());
}
+ @VisibleForTesting
+ static class BigtableReaderWithExperimentalOptions implements Reader {
+ private final BigtableDataClient client;
+
+ private final String projectId;
+ private final String instanceId;
+ private final String tableId;
+
+ private final List<ByteKeyRange> ranges;
+ private final RowFilter rowFilter;
+ private Iterator<Row> results;
+
+ private Row currentRow;
+
+ private ServerStream<Row> stream;
+
+ private boolean exhausted;
+
+ @VisibleForTesting
+ BigtableReaderWithExperimentalOptions(
+ BigtableDataClient client,
+ String projectId,
+ String instanceId,
+ String tableId,
+ List<ByteKeyRange> ranges,
+ @Nullable RowFilter rowFilter) {
+ this.client = client;
+ this.projectId = projectId;
+ this.instanceId = instanceId;
+ this.tableId = tableId;
+ this.ranges = ranges;
+ this.rowFilter = rowFilter;
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ ServiceCallMetric serviceCallMetric = createCallMetric(projectId,
instanceId, tableId);
+
+ Query query = Query.create(tableId);
+ for (ByteKeyRange sourceRange : ranges) {
+ query.range(
+ ByteString.copyFrom(sourceRange.getStartKey().getValue()),
+ ByteString.copyFrom(sourceRange.getEndKey().getValue()));
+ }
+
+ if (rowFilter != null) {
+ query.filter(Filters.FILTERS.fromProto(rowFilter));
+ }
+ try {
+ stream =
+ client
+ .skipLargeRowsCallable(new BigtableRowProtoAdapter())
Review Comment:
I forget, does this throw an exception with the large rows, or just silently
swallow them?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java:
##########
@@ -631,6 +631,22 @@ public Read withMaxBufferElementCount(@Nullable Integer
maxBufferElementCount) {
.build();
}
+ /**
+ * Returns a new {@link BigtableIO.Read} that will skip the large rows
(>256MB) while reading
+ * This function will switch the base BigtableIO.Reader class to using the
+ * BigtableReaderWithExperimentalOptions. If
+ *
+ * <p>Does not modify this object.
+ */
+ public Read withExperimentSkipLargeRows(@Nullable Boolean skipLargeRows) {
Review Comment:
Experimental
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java:
##########
@@ -631,6 +631,22 @@ public Read withMaxBufferElementCount(@Nullable Integer
maxBufferElementCount) {
.build();
}
+ /**
+ * Returns a new {@link BigtableIO.Read} that will skip the large rows
(>256MB) while reading
+ * This function will switch the base BigtableIO.Reader class to using the
+ * BigtableReaderWithExperimentalOptions. If
+ *
+ * <p>Does not modify this object.
Review Comment:
Maybe mention that this is incompatible with withMaxBufferElementCount(),
that if both are set, only MaxBufferElementCount will apply.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java:
##########
@@ -631,6 +631,22 @@ public Read withMaxBufferElementCount(@Nullable Integer
maxBufferElementCount) {
.build();
}
+ /**
+ * Returns a new {@link BigtableIO.Read} that will skip the large rows
(>256MB) while reading
+ * This function will switch the base BigtableIO.Reader class to using the
+ * BigtableReaderWithExperimentalOptions. If
Review Comment:
This feels a bit like an implementation detail that could go out of date.
Perhaps leave this sentence out?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]