Repository: incubator-beam Updated Branches: refs/heads/master 9d6d622ab -> e01efbda3
Implement getSplitPointsConsumed() in BigtableIO Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f0f809a0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f0f809a0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f0f809a0 Branch: refs/heads/master Commit: f0f809a0960d003b178c82d268598f69456bc8a0 Parents: 9d6d622 Author: Ian Zhou <[email protected]> Authored: Wed Jun 29 12:03:27 2016 -0700 Committer: Dan Halperin <[email protected]> Committed: Wed Jun 29 20:47:45 2016 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/bigtable/BigtableIO.java | 11 +++++-- .../sdk/io/gcp/bigtable/BigtableIOTest.java | 32 +++++++++++++++++++- 2 files changed, 40 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f0f809a0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index f725a66..cddb333 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -765,7 +765,8 @@ public class BigtableIO { reader = service.createReader(getCurrentSource()); boolean hasRecord = reader.start() - && rangeTracker.tryReturnRecordAt(true, ByteKey.of(reader.getCurrentRow().getKey())); + && rangeTracker.tryReturnRecordAt(true, ByteKey.of(reader.getCurrentRow().getKey())) + || rangeTracker.markDone(); if (hasRecord) { ++recordsReturned; } @@ -781,7 +782,8 @@ public class BigtableIO { public boolean advance() throws IOException { boolean hasRecord = reader.advance() - && rangeTracker.tryReturnRecordAt(true, ByteKey.of(reader.getCurrentRow().getKey())); + && rangeTracker.tryReturnRecordAt(true, ByteKey.of(reader.getCurrentRow().getKey())) + || rangeTracker.markDone(); if (hasRecord) { ++recordsReturned; } @@ -808,6 +810,11 @@ public class BigtableIO { } @Override + public final long getSplitPointsConsumed() { + return rangeTracker.getSplitPointsConsumed(); + } + + @Override public final synchronized BigtableSource splitAtFraction(double fraction) { ByteKey splitKey; try { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f0f809a0/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java index c09943b..cdbaaac 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java @@ -25,7 +25,6 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisp import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Verify.verifyNotNull; - import static org.hamcrest.Matchers.hasSize; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -33,6 +32,7 @@ import static org.junit.Assert.assertThat; import org.apache.beam.sdk.Pipeline.PipelineExecutionException; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.BoundedSource.BoundedReader; import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource; import org.apache.beam.sdk.io.range.ByteKey; import org.apache.beam.sdk.io.range.ByteKeyRange; @@ -490,6 +490,36 @@ public class BigtableIOTest { assertThat(displayData, hasDisplayItem("tableId", "fooTable")); } + @Test + public void testGetSplitPointsConsumed() throws Exception { + final String table = "TEST-TABLE"; + final int numRows = 100; + int splitPointsConsumed = 0; + + makeTableData(table, numRows); + + BigtableSource source = + new BigtableSource(service, table, null, ByteKeyRange.ALL_KEYS, null); + + BoundedReader<Row> reader = source.createReader(TestPipeline.testingPipelineOptions()); + + reader.start(); + // Started, 0 split points consumed + assertEquals("splitPointsConsumed starting", + splitPointsConsumed, reader.getSplitPointsConsumed()); + + // Split points consumed increases for each row read + while (reader.advance()) { + assertEquals("splitPointsConsumed advancing", + ++splitPointsConsumed, reader.getSplitPointsConsumed()); + } + + // Reader marked as done, 100 split points consumed + assertEquals("splitPointsConsumed done", numRows, reader.getSplitPointsConsumed()); + + reader.close(); + } + //////////////////////////////////////////////////////////////////////////////////////////// private static final String COLUMN_FAMILY_NAME = "family"; private static final ByteString COLUMN_NAME = ByteString.copyFromUtf8("column");
