igorbernstein2 commented on a change in pull request #16939:
URL: https://github.com/apache/beam/pull/16939#discussion_r837561394
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
##########
@@ -30,27 +30,35 @@
import com.google.bigtable.v2.SampleRowKeysRequest;
import com.google.bigtable.v2.SampleRowKeysResponse;
import com.google.cloud.bigtable.config.BigtableOptions;
+import com.google.cloud.bigtable.data.v2.internal.ByteStringComparator;
Review comment:
Please don't use internal classes. It looks like protobuf has its own
comparator which should probably use in bigtable:
BytesString#unsignedLexicographicalComparator()
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
##########
@@ -110,34 +118,49 @@ public boolean tableExists(String tableId) throws
IOException {
static class BigtableReaderImpl implements Reader {
private BigtableSession session;
private final BigtableSource source;
- private ResultScanner<Row> results;
+ private List<Row> results;
private Row currentRow;
+ private Queue<Row> buffer;
+ private RowSet rowSet;
+ private ServiceCallMetric serviceCallMetric;
+ private Future<List<Row>> future;
+ private ByteString lastRowInBuffer;
Review comment:
lastFetchedRow?
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
##########
@@ -110,34 +118,49 @@ public boolean tableExists(String tableId) throws
IOException {
static class BigtableReaderImpl implements Reader {
private BigtableSession session;
private final BigtableSource source;
- private ResultScanner<Row> results;
+ private List<Row> results;
private Row currentRow;
+ private Queue<Row> buffer;
+ private RowSet rowSet;
+ private ServiceCallMetric serviceCallMetric;
+ private Future<List<Row>> future;
+ private ByteString lastRowInBuffer;
+
+ private final String tableNameStr;
+ private final int miniBatchLimit = 100;
@VisibleForTesting
BigtableReaderImpl(BigtableSession session, BigtableSource source) {
this.session = session;
+ tableNameStr =
+
session.getOptions().getInstanceName().toTableNameStr(source.getTableId().get());
this.source = source;
}
@Override
public boolean start() throws IOException {
- RowSet.Builder rowSetBuilder = RowSet.newBuilder();
- for (ByteKeyRange sourceRange : source.getRanges()) {
- rowSetBuilder =
- rowSetBuilder.addRowRanges(
- RowRange.newBuilder()
-
.setStartKeyClosed(ByteString.copyFrom(sourceRange.getStartKey().getValue()))
-
.setEndKeyOpen(ByteString.copyFrom(sourceRange.getEndKey().getValue())));
+ buffer = new ConcurrentLinkedQueue<Row>();
+ RowRange[] rowRanges = new RowRange[source.getRanges().size()];
+ for (int i = 0; i < source.getRanges().size(); i++) {
+ rowRanges[i] =
+ RowRange.newBuilder()
+ .setStartKeyClosed(
+
ByteString.copyFrom(source.getRanges().get(i).getStartKey().getValue()))
+ .setEndKeyOpen(
+
ByteString.copyFrom(source.getRanges().get(i).getEndKey().getValue()))
+ .build();
}
- RowSet rowSet = rowSetBuilder.build();
-
- String tableNameSr =
-
session.getOptions().getInstanceName().toTableNameStr(source.getTableId().get());
+ // Sort the rowRanges by startKey
+ Arrays.sort(rowRanges, RANGE_START_COMPARATOR);
+ rowSet =
+ RowSet.newBuilder()
+
.addAllRowRanges(Arrays.stream(rowRanges).collect(Collectors.toList()))
+ .build();
HashMap<String, String> baseLabels = new HashMap<>();
baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "BigTable");
- baseLabels.put(MonitoringInfoConstants.Labels.METHOD,
"google.bigtable.v2.ReadRows");
+ baseLabels.put(MonitoringInfoConstants.Labels.METHOD,
"google.bigtable.v2.ReadRowsAsync");
Review comment:
I think this should stay ReadRows...I believe this reflects the RPC name
not the calling convention on the client
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
##########
@@ -110,34 +118,49 @@ public boolean tableExists(String tableId) throws
IOException {
static class BigtableReaderImpl implements Reader {
private BigtableSession session;
private final BigtableSource source;
- private ResultScanner<Row> results;
+ private List<Row> results;
private Row currentRow;
+ private Queue<Row> buffer;
+ private RowSet rowSet;
+ private ServiceCallMetric serviceCallMetric;
+ private Future<List<Row>> future;
+ private ByteString lastRowInBuffer;
+
+ private final String tableNameStr;
+ private final int miniBatchLimit = 100;
@VisibleForTesting
BigtableReaderImpl(BigtableSession session, BigtableSource source) {
this.session = session;
+ tableNameStr =
+
session.getOptions().getInstanceName().toTableNameStr(source.getTableId().get());
this.source = source;
}
@Override
public boolean start() throws IOException {
- RowSet.Builder rowSetBuilder = RowSet.newBuilder();
- for (ByteKeyRange sourceRange : source.getRanges()) {
- rowSetBuilder =
- rowSetBuilder.addRowRanges(
- RowRange.newBuilder()
-
.setStartKeyClosed(ByteString.copyFrom(sourceRange.getStartKey().getValue()))
-
.setEndKeyOpen(ByteString.copyFrom(sourceRange.getEndKey().getValue())));
+ buffer = new ConcurrentLinkedQueue<Row>();
+ RowRange[] rowRanges = new RowRange[source.getRanges().size()];
+ for (int i = 0; i < source.getRanges().size(); i++) {
+ rowRanges[i] =
+ RowRange.newBuilder()
+ .setStartKeyClosed(
+
ByteString.copyFrom(source.getRanges().get(i).getStartKey().getValue()))
+ .setEndKeyOpen(
+
ByteString.copyFrom(source.getRanges().get(i).getEndKey().getValue()))
+ .build();
}
- RowSet rowSet = rowSetBuilder.build();
-
- String tableNameSr =
-
session.getOptions().getInstanceName().toTableNameStr(source.getTableId().get());
+ // Sort the rowRanges by startKey
Review comment:
This comment is not useful, I can read see that we are sorting the
rowRanges RANGE_START_COMPARATOR from the code. The part thats not clear is
why? So you comment should be something along the lines of:
"Presort the ranges so that future segmentation can exit early when
splitting the query"
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
##########
@@ -154,27 +177,98 @@ public boolean start() throws IOException {
session.getOptions().getProjectId(),
session.getOptions().getInstanceId(),
source.getTableId().get()));
- ServiceCallMetric serviceCallMetric =
+ serviceCallMetric =
new
ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
- ReadRowsRequest.Builder requestB =
-
ReadRowsRequest.newBuilder().setRows(rowSet).setTableName(tableNameSr);
- if (source.getRowFilter() != null) {
- requestB.setFilter(source.getRowFilter());
+
+ future = session.getDataClient().readRowsAsync(buildReadRowsRequest());
+ return advance();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ if (buffer.isEmpty()) {
+ if (future == null || !waitReadRowsFuture()) {
+ return false;
+ }
+ } else if (future == null && buffer.size() < 10) {
+ loadReadRowsFuture();
+ }
+ currentRow = buffer.remove();
+ return currentRow != null;
+ }
+
+ private void loadReadRowsFuture() {
+ if (!splitRowSet(lastRowInBuffer)) {
+ return;
}
+ future = session.getDataClient().readRowsAsync(buildReadRowsRequest());
+ }
+
+ private boolean waitReadRowsFuture() throws IOException {
try {
- results = session.getDataClient().readRows(requestB.build());
+ results = future.get();
+ future = null;
serviceCallMetric.call("ok");
+ return fillReadRowsBuffer();
} catch (StatusRuntimeException e) {
serviceCallMetric.call(e.getStatus().getCode().value());
throw e;
+ } catch (InterruptedException | ExecutionException e) {
+ serviceCallMetric.call(e.getCause().toString());
+ throw new IOException(e);
}
- return advance();
}
- @Override
- public boolean advance() throws IOException {
- currentRow = results.next();
- return currentRow != null;
+ private ReadRowsRequest buildReadRowsRequest() {
+ ReadRowsRequest.Builder request =
+ ReadRowsRequest.newBuilder()
+ .setRows(rowSet)
+ .setRowsLimit(miniBatchLimit)
+ .setTableName(tableNameStr);
+ if (source.getRowFilter() != null) {
+ request.setFilter(source.getRowFilter());
+ }
+ return request.build();
+ }
+
+ private boolean splitRowSet(ByteString splitPoint) {
+ if (RowSet.getDefaultInstance().equals(rowSet)) {
+ rowSet =
RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance()).build();
+ }
+ RowSet.Builder segment = RowSet.newBuilder();
+ for (int i = 0; i < rowSet.getRowRangesCount(); i++) {
+ RowRange rowRange = rowSet.getRowRanges(i);
+ int startCmp = StartPoint.extract(rowRange).compareTo(new
StartPoint(splitPoint, true));
+ int endCmp = EndPoint.extract(rowRange).compareTo(new
EndPoint(splitPoint, true));
+
+ if (startCmp > 0) {
+ // If the startKey is passed the split point than add the whole range
+ segment.addRowRanges(rowRange);
+ } else if (endCmp > 0) {
+ // Row is split, remove all read rowKeys and split RowSet at last
buffered Row
+ RowRange subRange =
rowRange.toBuilder().setStartKeyOpen(splitPoint).build();
+ segment.addRowRanges(subRange);
+ }
+ }
+ if (segment.getRowRangesCount() == 0) {
+ return false;
+ }
+ rowSet = segment.build();
+ return true;
+ }
+
+ private boolean fillReadRowsBuffer() {
+ List<Row> readRows;
+ int amountOfRows = results.size();
+
+ if (amountOfRows == 0) {
+ return false;
+ } else {
+ readRows = results.subList(0, results.size());
Review comment:
why do you need to keep the list? shouldnt buffer be enough?
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
##########
@@ -154,27 +177,98 @@ public boolean start() throws IOException {
session.getOptions().getProjectId(),
session.getOptions().getInstanceId(),
source.getTableId().get()));
- ServiceCallMetric serviceCallMetric =
+ serviceCallMetric =
new
ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
- ReadRowsRequest.Builder requestB =
-
ReadRowsRequest.newBuilder().setRows(rowSet).setTableName(tableNameSr);
- if (source.getRowFilter() != null) {
- requestB.setFilter(source.getRowFilter());
+
+ future = session.getDataClient().readRowsAsync(buildReadRowsRequest());
+ return advance();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ if (buffer.isEmpty()) {
+ if (future == null || !waitReadRowsFuture()) {
+ return false;
+ }
+ } else if (future == null && buffer.size() < 10) {
+ loadReadRowsFuture();
+ }
+ currentRow = buffer.remove();
+ return currentRow != null;
+ }
+
+ private void loadReadRowsFuture() {
+ if (!splitRowSet(lastRowInBuffer)) {
+ return;
}
+ future = session.getDataClient().readRowsAsync(buildReadRowsRequest());
+ }
+
+ private boolean waitReadRowsFuture() throws IOException {
try {
- results = session.getDataClient().readRows(requestB.build());
+ results = future.get();
+ future = null;
serviceCallMetric.call("ok");
+ return fillReadRowsBuffer();
} catch (StatusRuntimeException e) {
serviceCallMetric.call(e.getStatus().getCode().value());
throw e;
+ } catch (InterruptedException | ExecutionException e) {
+ serviceCallMetric.call(e.getCause().toString());
+ throw new IOException(e);
}
- return advance();
}
- @Override
- public boolean advance() throws IOException {
- currentRow = results.next();
- return currentRow != null;
+ private ReadRowsRequest buildReadRowsRequest() {
+ ReadRowsRequest.Builder request =
+ ReadRowsRequest.newBuilder()
+ .setRows(rowSet)
+ .setRowsLimit(miniBatchLimit)
+ .setTableName(tableNameStr);
+ if (source.getRowFilter() != null) {
+ request.setFilter(source.getRowFilter());
+ }
+ return request.build();
+ }
+
+ private boolean splitRowSet(ByteString splitPoint) {
+ if (RowSet.getDefaultInstance().equals(rowSet)) {
Review comment:
I would make this explicit:` rowSet.getRanges().isEmpty() &&
rowSet.getKeys().isEmpty()`
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
##########
@@ -334,4 +423,94 @@ public Reader createReader(BigtableSource source) throws
IOException {
return session.getDataClient().sampleRowKeys(request);
}
}
+
+ private static final Comparator<RowRange> RANGE_START_COMPARATOR =
+ new Comparator<RowRange>() {
+ @Override
+ public int compare(@Nonnull RowRange o1, @Nonnull RowRange o2) {
+ return StartPoint.extract(o1).compareTo(StartPoint.extract(o2));
+ }
+ };
+
+ /** Helper class to ease comparison of RowRange start points. */
+ private static final class StartPoint implements Comparable<StartPoint> {
+ private final ByteString value;
+ private final boolean isClosed;
+
+ @Nonnull
+ static StartPoint extract(@Nonnull RowRange rowRange) {
+ switch (rowRange.getStartKeyCase()) {
+ case STARTKEY_NOT_SET:
+ return new StartPoint(ByteString.EMPTY, true);
+ case START_KEY_CLOSED:
+ return new StartPoint(rowRange.getStartKeyClosed(), true);
+ case START_KEY_OPEN:
+ if (rowRange.getStartKeyOpen().isEmpty()) {
+ // Take care to normalize an open empty start key to be closed.
+ return new StartPoint(ByteString.EMPTY, true);
+ } else {
+ return new StartPoint(rowRange.getStartKeyOpen(), false);
+ }
+ default:
+ throw new IllegalArgumentException("Unknown startKeyCase: " +
rowRange.getStartKeyCase());
+ }
+ }
+
+ StartPoint(@Nonnull ByteString value, boolean isClosed) {
Review comment:
private
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
##########
@@ -110,34 +118,49 @@ public boolean tableExists(String tableId) throws
IOException {
static class BigtableReaderImpl implements Reader {
private BigtableSession session;
private final BigtableSource source;
- private ResultScanner<Row> results;
+ private List<Row> results;
Review comment:
There are too many buffers floating around: results, buffer, currentRow
& future. I think you can simplify it to a single queue and a future.
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
##########
@@ -110,34 +118,49 @@ public boolean tableExists(String tableId) throws
IOException {
static class BigtableReaderImpl implements Reader {
Review comment:
Please make a separate Reader impl for this behavior so that its
possible to toggle the behavior
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
##########
@@ -110,34 +118,49 @@ public boolean tableExists(String tableId) throws
IOException {
static class BigtableReaderImpl implements Reader {
private BigtableSession session;
private final BigtableSource source;
- private ResultScanner<Row> results;
+ private List<Row> results;
private Row currentRow;
+ private Queue<Row> buffer;
+ private RowSet rowSet;
+ private ServiceCallMetric serviceCallMetric;
+ private Future<List<Row>> future;
+ private ByteString lastRowInBuffer;
+
+ private final String tableNameStr;
+ private final int miniBatchLimit = 100;
Review comment:
I would split this into 2 fields - a constant default value and the
instance specific value with a comment to consider making it configureable.
private static final int DEFAULT_MAX_BATCH_SIZE = 100;
...
// TODO: consider making this user configureable
private final int maxBatchSize = DEFAULT_MAX_BATCH_SIZE;
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
##########
@@ -154,27 +177,98 @@ public boolean start() throws IOException {
session.getOptions().getProjectId(),
session.getOptions().getInstanceId(),
source.getTableId().get()));
- ServiceCallMetric serviceCallMetric =
+ serviceCallMetric =
new
ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
- ReadRowsRequest.Builder requestB =
-
ReadRowsRequest.newBuilder().setRows(rowSet).setTableName(tableNameSr);
- if (source.getRowFilter() != null) {
- requestB.setFilter(source.getRowFilter());
+
+ future = session.getDataClient().readRowsAsync(buildReadRowsRequest());
+ return advance();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ if (buffer.isEmpty()) {
Review comment:
I think it would be nice to remove some nesting here:
```java
if (buffer.size() <= minBufferSize && future == null) {
loadReadRowsFuture();
}
if (buffer.isEmpty()) {
waitReadRowsFuture();
}
currentRow = buffer.remove();
return currentRow != null;
```
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
##########
@@ -154,27 +177,98 @@ public boolean start() throws IOException {
session.getOptions().getProjectId(),
session.getOptions().getInstanceId(),
source.getTableId().get()));
- ServiceCallMetric serviceCallMetric =
+ serviceCallMetric =
new
ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
- ReadRowsRequest.Builder requestB =
-
ReadRowsRequest.newBuilder().setRows(rowSet).setTableName(tableNameSr);
- if (source.getRowFilter() != null) {
- requestB.setFilter(source.getRowFilter());
+
+ future = session.getDataClient().readRowsAsync(buildReadRowsRequest());
+ return advance();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ if (buffer.isEmpty()) {
+ if (future == null || !waitReadRowsFuture()) {
+ return false;
+ }
+ } else if (future == null && buffer.size() < 10) {
+ loadReadRowsFuture();
+ }
+ currentRow = buffer.remove();
+ return currentRow != null;
+ }
+
+ private void loadReadRowsFuture() {
+ if (!splitRowSet(lastRowInBuffer)) {
+ return;
}
+ future = session.getDataClient().readRowsAsync(buildReadRowsRequest());
+ }
+
+ private boolean waitReadRowsFuture() throws IOException {
try {
- results = session.getDataClient().readRows(requestB.build());
+ results = future.get();
+ future = null;
serviceCallMetric.call("ok");
+ return fillReadRowsBuffer();
} catch (StatusRuntimeException e) {
serviceCallMetric.call(e.getStatus().getCode().value());
throw e;
+ } catch (InterruptedException | ExecutionException e) {
+ serviceCallMetric.call(e.getCause().toString());
+ throw new IOException(e);
Review comment:
When catching `InterruptedException` exceptions make sure to propagate
the interrupted flag
`Thread.current().interrupt()`
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
##########
@@ -154,27 +177,98 @@ public boolean start() throws IOException {
session.getOptions().getProjectId(),
session.getOptions().getInstanceId(),
source.getTableId().get()));
- ServiceCallMetric serviceCallMetric =
+ serviceCallMetric =
new
ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
- ReadRowsRequest.Builder requestB =
-
ReadRowsRequest.newBuilder().setRows(rowSet).setTableName(tableNameSr);
- if (source.getRowFilter() != null) {
- requestB.setFilter(source.getRowFilter());
+
+ future = session.getDataClient().readRowsAsync(buildReadRowsRequest());
+ return advance();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ if (buffer.isEmpty()) {
+ if (future == null || !waitReadRowsFuture()) {
+ return false;
+ }
+ } else if (future == null && buffer.size() < 10) {
+ loadReadRowsFuture();
+ }
+ currentRow = buffer.remove();
+ return currentRow != null;
+ }
+
+ private void loadReadRowsFuture() {
Review comment:
The makes it sound like its synchronous. I think something like
`startNextSegmentRead()` would be better
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
##########
@@ -110,34 +118,49 @@ public boolean tableExists(String tableId) throws
IOException {
static class BigtableReaderImpl implements Reader {
private BigtableSession session;
private final BigtableSource source;
- private ResultScanner<Row> results;
+ private List<Row> results;
private Row currentRow;
+ private Queue<Row> buffer;
+ private RowSet rowSet;
+ private ServiceCallMetric serviceCallMetric;
+ private Future<List<Row>> future;
+ private ByteString lastRowInBuffer;
+
+ private final String tableNameStr;
+ private final int miniBatchLimit = 100;
@VisibleForTesting
BigtableReaderImpl(BigtableSession session, BigtableSource source) {
this.session = session;
+ tableNameStr =
+
session.getOptions().getInstanceName().toTableNameStr(source.getTableId().get());
this.source = source;
}
@Override
public boolean start() throws IOException {
- RowSet.Builder rowSetBuilder = RowSet.newBuilder();
- for (ByteKeyRange sourceRange : source.getRanges()) {
- rowSetBuilder =
- rowSetBuilder.addRowRanges(
- RowRange.newBuilder()
-
.setStartKeyClosed(ByteString.copyFrom(sourceRange.getStartKey().getValue()))
-
.setEndKeyOpen(ByteString.copyFrom(sourceRange.getEndKey().getValue())));
+ buffer = new ConcurrentLinkedQueue<Row>();
Review comment:
I dont think this needs to be a concurrent queue. I think you'll be fine
with ArrayDeque
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
##########
@@ -154,27 +177,98 @@ public boolean start() throws IOException {
session.getOptions().getProjectId(),
session.getOptions().getInstanceId(),
source.getTableId().get()));
- ServiceCallMetric serviceCallMetric =
+ serviceCallMetric =
new
ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
- ReadRowsRequest.Builder requestB =
-
ReadRowsRequest.newBuilder().setRows(rowSet).setTableName(tableNameSr);
- if (source.getRowFilter() != null) {
- requestB.setFilter(source.getRowFilter());
+
+ future = session.getDataClient().readRowsAsync(buildReadRowsRequest());
+ return advance();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ if (buffer.isEmpty()) {
+ if (future == null || !waitReadRowsFuture()) {
+ return false;
+ }
+ } else if (future == null && buffer.size() < 10) {
+ loadReadRowsFuture();
+ }
+ currentRow = buffer.remove();
+ return currentRow != null;
+ }
+
+ private void loadReadRowsFuture() {
+ if (!splitRowSet(lastRowInBuffer)) {
+ return;
}
+ future = session.getDataClient().readRowsAsync(buildReadRowsRequest());
+ }
+
+ private boolean waitReadRowsFuture() throws IOException {
try {
- results = session.getDataClient().readRows(requestB.build());
+ results = future.get();
+ future = null;
serviceCallMetric.call("ok");
+ return fillReadRowsBuffer();
} catch (StatusRuntimeException e) {
serviceCallMetric.call(e.getStatus().getCode().value());
throw e;
+ } catch (InterruptedException | ExecutionException e) {
+ serviceCallMetric.call(e.getCause().toString());
Review comment:
This seems weird. What are you trying to do here?
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
##########
@@ -110,34 +118,49 @@ public boolean tableExists(String tableId) throws
IOException {
static class BigtableReaderImpl implements Reader {
private BigtableSession session;
private final BigtableSource source;
- private ResultScanner<Row> results;
+ private List<Row> results;
private Row currentRow;
+ private Queue<Row> buffer;
+ private RowSet rowSet;
+ private ServiceCallMetric serviceCallMetric;
+ private Future<List<Row>> future;
+ private ByteString lastRowInBuffer;
+
+ private final String tableNameStr;
Review comment:
dont suffix variable names with the type
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
##########
@@ -334,4 +423,94 @@ public Reader createReader(BigtableSource source) throws
IOException {
return session.getDataClient().sampleRowKeys(request);
}
}
+
+ private static final Comparator<RowRange> RANGE_START_COMPARATOR =
+ new Comparator<RowRange>() {
+ @Override
+ public int compare(@Nonnull RowRange o1, @Nonnull RowRange o2) {
+ return StartPoint.extract(o1).compareTo(StartPoint.extract(o2));
+ }
+ };
+
+ /** Helper class to ease comparison of RowRange start points. */
+ private static final class StartPoint implements Comparable<StartPoint> {
+ private final ByteString value;
+ private final boolean isClosed;
+
+ @Nonnull
+ static StartPoint extract(@Nonnull RowRange rowRange) {
+ switch (rowRange.getStartKeyCase()) {
+ case STARTKEY_NOT_SET:
+ return new StartPoint(ByteString.EMPTY, true);
+ case START_KEY_CLOSED:
+ return new StartPoint(rowRange.getStartKeyClosed(), true);
+ case START_KEY_OPEN:
+ if (rowRange.getStartKeyOpen().isEmpty()) {
+ // Take care to normalize an open empty start key to be closed.
+ return new StartPoint(ByteString.EMPTY, true);
+ } else {
+ return new StartPoint(rowRange.getStartKeyOpen(), false);
+ }
+ default:
+ throw new IllegalArgumentException("Unknown startKeyCase: " +
rowRange.getStartKeyCase());
+ }
+ }
+
+ StartPoint(@Nonnull ByteString value, boolean isClosed) {
+ this.value = value;
+ this.isClosed = isClosed;
+ }
+
+ @Override
+ public int compareTo(@Nonnull StartPoint o) {
+ return ComparisonChain.start()
+ // Empty string comes first
+ .compareTrueFirst(value.isEmpty(), o.value.isEmpty())
+ .compare(value, o.value, ByteStringComparator.INSTANCE)
+ // Closed start point comes before an open start point: [x,y] starts
before (x,y].
+ .compareTrueFirst(isClosed, o.isClosed)
+ .result();
+ }
+ }
+
+ /** Helper class to ease comparison of RowRange endpoints. */
+ private static final class EndPoint implements Comparable<EndPoint> {
+ private final ByteString value;
+ private final boolean isClosed;
+
+ @Nonnull
+ static EndPoint extract(@Nonnull RowRange rowRange) {
+ switch (rowRange.getEndKeyCase()) {
+ case ENDKEY_NOT_SET:
+ return new EndPoint(ByteString.EMPTY, true);
+ case END_KEY_CLOSED:
+ return new EndPoint(rowRange.getEndKeyClosed(), true);
+ case END_KEY_OPEN:
+ if (rowRange.getEndKeyOpen().isEmpty()) {
+ // Take care to normalize an open empty end key to be closed.
+ return new EndPoint(ByteString.EMPTY, true);
+ } else {
+ return new EndPoint(rowRange.getEndKeyOpen(), false);
+ }
+ default:
+ throw new IllegalArgumentException("Unknown endKeyCase: " +
rowRange.getEndKeyCase());
+ }
+ }
+
+ EndPoint(@Nonnull ByteString value, boolean isClosed) {
Review comment:
private
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
##########
@@ -154,27 +177,98 @@ public boolean start() throws IOException {
session.getOptions().getProjectId(),
session.getOptions().getInstanceId(),
source.getTableId().get()));
- ServiceCallMetric serviceCallMetric =
+ serviceCallMetric =
new
ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
- ReadRowsRequest.Builder requestB =
-
ReadRowsRequest.newBuilder().setRows(rowSet).setTableName(tableNameSr);
- if (source.getRowFilter() != null) {
- requestB.setFilter(source.getRowFilter());
+
+ future = session.getDataClient().readRowsAsync(buildReadRowsRequest());
+ return advance();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ if (buffer.isEmpty()) {
+ if (future == null || !waitReadRowsFuture()) {
+ return false;
+ }
+ } else if (future == null && buffer.size() < 10) {
+ loadReadRowsFuture();
+ }
+ currentRow = buffer.remove();
+ return currentRow != null;
+ }
+
+ private void loadReadRowsFuture() {
+ if (!splitRowSet(lastRowInBuffer)) {
+ return;
}
+ future = session.getDataClient().readRowsAsync(buildReadRowsRequest());
+ }
+
+ private boolean waitReadRowsFuture() throws IOException {
try {
- results = session.getDataClient().readRows(requestB.build());
+ results = future.get();
+ future = null;
serviceCallMetric.call("ok");
+ return fillReadRowsBuffer();
} catch (StatusRuntimeException e) {
serviceCallMetric.call(e.getStatus().getCode().value());
throw e;
+ } catch (InterruptedException | ExecutionException e) {
+ serviceCallMetric.call(e.getCause().toString());
+ throw new IOException(e);
}
- return advance();
}
- @Override
- public boolean advance() throws IOException {
- currentRow = results.next();
- return currentRow != null;
+ private ReadRowsRequest buildReadRowsRequest() {
+ ReadRowsRequest.Builder request =
+ ReadRowsRequest.newBuilder()
+ .setRows(rowSet)
+ .setRowsLimit(miniBatchLimit)
+ .setTableName(tableNameStr);
+ if (source.getRowFilter() != null) {
+ request.setFilter(source.getRowFilter());
+ }
+ return request.build();
+ }
+
+ private boolean splitRowSet(ByteString splitPoint) {
+ if (RowSet.getDefaultInstance().equals(rowSet)) {
+ rowSet =
RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance()).build();
+ }
+ RowSet.Builder segment = RowSet.newBuilder();
+ for (int i = 0; i < rowSet.getRowRangesCount(); i++) {
+ RowRange rowRange = rowSet.getRowRanges(i);
+ int startCmp = StartPoint.extract(rowRange).compareTo(new
StartPoint(splitPoint, true));
+ int endCmp = EndPoint.extract(rowRange).compareTo(new
EndPoint(splitPoint, true));
+
+ if (startCmp > 0) {
+ // If the startKey is passed the split point than add the whole range
+ segment.addRowRanges(rowRange);
+ } else if (endCmp > 0) {
+ // Row is split, remove all read rowKeys and split RowSet at last
buffered Row
+ RowRange subRange =
rowRange.toBuilder().setStartKeyOpen(splitPoint).build();
+ segment.addRowRanges(subRange);
+ }
+ }
+ if (segment.getRowRangesCount() == 0) {
+ return false;
+ }
+ rowSet = segment.build();
+ return true;
+ }
+
+ private boolean fillReadRowsBuffer() {
Review comment:
please move this closer to `waitReadRowsFuture`, it makes it easier to
read
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
##########
@@ -154,27 +177,98 @@ public boolean start() throws IOException {
session.getOptions().getProjectId(),
session.getOptions().getInstanceId(),
source.getTableId().get()));
- ServiceCallMetric serviceCallMetric =
+ serviceCallMetric =
new
ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
- ReadRowsRequest.Builder requestB =
-
ReadRowsRequest.newBuilder().setRows(rowSet).setTableName(tableNameSr);
- if (source.getRowFilter() != null) {
- requestB.setFilter(source.getRowFilter());
+
+ future = session.getDataClient().readRowsAsync(buildReadRowsRequest());
+ return advance();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ if (buffer.isEmpty()) {
+ if (future == null || !waitReadRowsFuture()) {
+ return false;
+ }
+ } else if (future == null && buffer.size() < 10) {
+ loadReadRowsFuture();
+ }
+ currentRow = buffer.remove();
+ return currentRow != null;
+ }
+
+ private void loadReadRowsFuture() {
+ if (!splitRowSet(lastRowInBuffer)) {
+ return;
}
+ future = session.getDataClient().readRowsAsync(buildReadRowsRequest());
+ }
+
+ private boolean waitReadRowsFuture() throws IOException {
try {
- results = session.getDataClient().readRows(requestB.build());
+ results = future.get();
+ future = null;
serviceCallMetric.call("ok");
+ return fillReadRowsBuffer();
} catch (StatusRuntimeException e) {
serviceCallMetric.call(e.getStatus().getCode().value());
throw e;
+ } catch (InterruptedException | ExecutionException e) {
+ serviceCallMetric.call(e.getCause().toString());
+ throw new IOException(e);
}
- return advance();
}
- @Override
- public boolean advance() throws IOException {
- currentRow = results.next();
- return currentRow != null;
+ private ReadRowsRequest buildReadRowsRequest() {
+ ReadRowsRequest.Builder request =
+ ReadRowsRequest.newBuilder()
+ .setRows(rowSet)
+ .setRowsLimit(miniBatchLimit)
+ .setTableName(tableNameStr);
+ if (source.getRowFilter() != null) {
+ request.setFilter(source.getRowFilter());
+ }
+ return request.build();
+ }
+
+ private boolean splitRowSet(ByteString splitPoint) {
+ if (RowSet.getDefaultInstance().equals(rowSet)) {
+ rowSet =
RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance()).build();
+ }
+ RowSet.Builder segment = RowSet.newBuilder();
+ for (int i = 0; i < rowSet.getRowRangesCount(); i++) {
+ RowRange rowRange = rowSet.getRowRanges(i);
+ int startCmp = StartPoint.extract(rowRange).compareTo(new
StartPoint(splitPoint, true));
+ int endCmp = EndPoint.extract(rowRange).compareTo(new
EndPoint(splitPoint, true));
+
+ if (startCmp > 0) {
+ // If the startKey is passed the split point than add the whole range
+ segment.addRowRanges(rowRange);
+ } else if (endCmp > 0) {
+ // Row is split, remove all read rowKeys and split RowSet at last
buffered Row
+ RowRange subRange =
rowRange.toBuilder().setStartKeyOpen(splitPoint).build();
+ segment.addRowRanges(subRange);
+ }
+ }
+ if (segment.getRowRangesCount() == 0) {
+ return false;
+ }
+ rowSet = segment.build();
+ return true;
+ }
+
+ private boolean fillReadRowsBuffer() {
+ List<Row> readRows;
+ int amountOfRows = results.size();
+
+ if (amountOfRows == 0) {
+ return false;
+ } else {
+ readRows = results.subList(0, results.size());
+ }
+ lastRowInBuffer = readRows.get(readRows.size() - 1).getKey();
+ buffer.addAll(readRows);
+ return true;
Review comment:
What happens when this returns less than the requested # of rows? I
think you can save an RPC in most cases by storing an upstreamExhausted flag.
Which is set to = amountOfRows < maxBatchSize. Then when you can check this
flag before pulling more data from upstream.
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
##########
@@ -154,27 +177,98 @@ public boolean start() throws IOException {
session.getOptions().getProjectId(),
session.getOptions().getInstanceId(),
source.getTableId().get()));
- ServiceCallMetric serviceCallMetric =
+ serviceCallMetric =
new
ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
- ReadRowsRequest.Builder requestB =
-
ReadRowsRequest.newBuilder().setRows(rowSet).setTableName(tableNameSr);
- if (source.getRowFilter() != null) {
- requestB.setFilter(source.getRowFilter());
+
+ future = session.getDataClient().readRowsAsync(buildReadRowsRequest());
Review comment:
I think we need to limit # of bytes as well. ie if the customer has
256MiB rows, we will OOM trying to buffer 100 of them. Maybe we can add a TODO
here and add it as a feature in the second pass. But basically it would do
something like:
```java
SettableFuture f = SettableFuture.create();
client.readFlatRowsAsync(request, new StreamObserver<FlatRow> {
List<FlatRow> rows = new ArrayList();
long currentSize = 0;
onStart(Controller c) {
this.controller = c;
{
onResponse(FlatRow row) {
rows.add(row);
currentSize += row.getKey().size() +
row.getCells().stream().mapToLong(c -> c.getQualifier().size() +
c.getValue().size()).sum();
if (currentSize > LIMIT) {
controller.cancel();
}
}
onComplete() {
f.set(rows);
}
onError(Throwable e) {
if (e instanceof CancellationException) {
f.set(rows);
return;
}
f.setException(e)
}
});
```
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
##########
@@ -154,27 +177,98 @@ public boolean start() throws IOException {
session.getOptions().getProjectId(),
session.getOptions().getInstanceId(),
source.getTableId().get()));
- ServiceCallMetric serviceCallMetric =
+ serviceCallMetric =
new
ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
- ReadRowsRequest.Builder requestB =
-
ReadRowsRequest.newBuilder().setRows(rowSet).setTableName(tableNameSr);
- if (source.getRowFilter() != null) {
- requestB.setFilter(source.getRowFilter());
+
+ future = session.getDataClient().readRowsAsync(buildReadRowsRequest());
+ return advance();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ if (buffer.isEmpty()) {
+ if (future == null || !waitReadRowsFuture()) {
+ return false;
+ }
+ } else if (future == null && buffer.size() < 10) {
+ loadReadRowsFuture();
+ }
+ currentRow = buffer.remove();
+ return currentRow != null;
+ }
+
+ private void loadReadRowsFuture() {
+ if (!splitRowSet(lastRowInBuffer)) {
+ return;
}
+ future = session.getDataClient().readRowsAsync(buildReadRowsRequest());
+ }
+
+ private boolean waitReadRowsFuture() throws IOException {
try {
- results = session.getDataClient().readRows(requestB.build());
+ results = future.get();
+ future = null;
serviceCallMetric.call("ok");
+ return fillReadRowsBuffer();
} catch (StatusRuntimeException e) {
serviceCallMetric.call(e.getStatus().getCode().value());
throw e;
+ } catch (InterruptedException | ExecutionException e) {
+ serviceCallMetric.call(e.getCause().toString());
+ throw new IOException(e);
}
- return advance();
}
- @Override
- public boolean advance() throws IOException {
- currentRow = results.next();
- return currentRow != null;
+ private ReadRowsRequest buildReadRowsRequest() {
Review comment:
I would put this before `loadReadRowsFuture`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]