sarthakbhutani commented on code in PR #34245:
URL: https://github.com/apache/beam/pull/34245#discussion_r1994188468
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -125,6 +125,100 @@ public BigtableWriterImpl
openForWriting(BigtableWriteOptions writeOptions) {
writeOptions.getCloseWaitTimeout());
}
+ @VisibleForTesting
+ static class BigtableReaderWithExperimentalOptions implements Reader {
+ private final BigtableDataClient client;
+
+ private final String projectId;
+ private final String instanceId;
+ private final String tableId;
+
+ private final List<ByteKeyRange> ranges;
+ private final RowFilter rowFilter;
+ private Iterator<Row> results;
+
+ private Row currentRow;
+
+ private ServerStream<Row> stream;
+
+ private boolean exhausted;
+
+ @VisibleForTesting
+ BigtableReaderWithExperimentalOptions(
+ BigtableDataClient client,
+ String projectId,
+ String instanceId,
+ String tableId,
+ List<ByteKeyRange> ranges,
+ @Nullable RowFilter rowFilter) {
+ this.client = client;
+ this.projectId = projectId;
+ this.instanceId = instanceId;
+ this.tableId = tableId;
+ this.ranges = ranges;
+ this.rowFilter = rowFilter;
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ ServiceCallMetric serviceCallMetric = createCallMetric(projectId,
instanceId, tableId);
+
+ Query query = Query.create(tableId);
+ for (ByteKeyRange sourceRange : ranges) {
+ query.range(
+ ByteString.copyFrom(sourceRange.getStartKey().getValue()),
+ ByteString.copyFrom(sourceRange.getEndKey().getValue()));
+ }
+
+ if (rowFilter != null) {
+ query.filter(Filters.FILTERS.fromProto(rowFilter));
+ }
+ try {
+ stream =
+ client
+ .skipLargeRowsCallable(new BigtableRowProtoAdapter())
Review Comment:
it swallows them & returns the next non-large row
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]