igorbernstein2 commented on code in PR #16939:
URL: https://github.com/apache/beam/pull/16939#discussion_r873890118
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -210,6 +209,216 @@ public Row getCurrentRow() throws NoSuchElementException {
}
}
+ @VisibleForTesting
+ static class BigtableSegmentReaderImpl implements Reader {
+ private BigtableSession session;
+
+ private @Nullable ReadRowsRequest nextRequest;
+ private @Nullable Row currentRow;
+ private final Queue<Row> buffer;
+ private final long refillSegmentWaterMark;
+ private final long maxSegmentByteSize;
+ private Future<UpstreamResults> future;
+ private ServiceCallMetric serviceCallMetric;
+
+ private static class UpstreamResults {
+ private final List<Row> rows;
+ private final @Nullable ReadRowsRequest nextRequest;
+
+ private UpstreamResults(List<Row> rows, @Nullable ReadRowsRequest
nextRequest) {
+ this.rows = rows;
+ this.nextRequest = nextRequest;
+ }
+ }
+
+ static BigtableSegmentReaderImpl create(BigtableSession session,
BigtableSource source) {
+ RowSet.Builder rowSetBuilder = RowSet.newBuilder();
+ if (source.getRanges().isEmpty()) {
+ rowSetBuilder =
RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance());
+ } else {
+ for (ByteKeyRange beamRange : source.getRanges()) {
+ RowRange.Builder rangeBuilder = rowSetBuilder.addRowRangesBuilder();
+ rangeBuilder
+
.setStartKeyClosed(ByteString.copyFrom(beamRange.getStartKey().getValue()))
+
.setEndKeyOpen(ByteString.copyFrom(beamRange.getEndKey().getValue()));
+ }
+ }
+
+ RowFilter filter =
+ MoreObjects.firstNonNull(source.getRowFilter(),
RowFilter.getDefaultInstance());
+ ReadRowsRequest request =
+ ReadRowsRequest.newBuilder()
+ .setTableName(
+
session.getOptions().getInstanceName().toTableNameStr(source.getTableId().get()))
+ .setRows(rowSetBuilder.build())
+ .setFilter(filter)
+ .setRowsLimit(source.getMaxBufferElementCount())
+ .build();
+
+ long maxSegmentByteSize =
+ (long) (Runtime.getRuntime().totalMemory() *
DEFAULT_BYTE_LIMIT_PERCENTAGE);
Review Comment:
please make maxSegmentByteSize a ctor parameter that your create() function
populates
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]