Repository: beam Updated Branches: refs/heads/master 6aed130cc -> 000378d6a
ByteKey: remove ByteString from public API, replace with ByteBuffer * BigtableIO: use ByteBuffer not ByteString * HBaseIO: replace ByteString with byte[] Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1efaa1e3 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1efaa1e3 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1efaa1e3 Branch: refs/heads/master Commit: 1efaa1e37976a801831f72b8a7c000bdcf6d3cd8 Parents: 6aed130 Author: Dan Halperin <[email protected]> Authored: Thu Apr 13 15:27:57 2017 -0700 Committer: Dan Halperin <[email protected]> Committed: Tue Apr 25 08:50:49 2017 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/io/range/ByteKey.java | 21 ++++++++-------- .../beam/sdk/io/range/ByteKeyRangeTest.java | 16 ++++++------ .../apache/beam/sdk/io/range/ByteKeyTest.java | 4 +-- .../beam/sdk/io/gcp/bigtable/BigtableIO.java | 12 ++++++--- .../io/gcp/bigtable/BigtableServiceImpl.java | 4 +-- .../sdk/io/gcp/bigtable/BigtableIOTest.java | 12 ++++++--- sdks/java/io/hbase/pom.xml | 5 ---- .../org/apache/beam/sdk/io/hbase/HBaseIO.java | 26 +++++++++----------- .../apache/beam/sdk/io/hbase/HBaseIOTest.java | 16 ++++++------ 9 files changed, 58 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/1efaa1e3/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKey.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKey.java index e4129ff..cd2377b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKey.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKey.java @@ -22,6 +22,8 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.protobuf.ByteString; import com.google.protobuf.ByteString.ByteIterator; import java.io.Serializable; +import java.nio.ByteBuffer; +import javax.annotation.Nonnull; /** * A class representing a key consisting of an array of bytes. Arbitrary-length @@ -43,10 +45,11 @@ public final class ByteKey implements Comparable<ByteKey>, Serializable { public static final ByteKey EMPTY = ByteKey.of(); /** - * Creates a new {@link ByteKey} backed by the specified {@link ByteString}. + * Creates a new {@link ByteKey} backed by a copy of the data remaining in the specified + * {@link ByteBuffer}. */ - public static ByteKey of(ByteString value) { - return new ByteKey(value); + public static ByteKey copyFrom(ByteBuffer value) { + return new ByteKey(ByteString.copyFrom(value)); } /** @@ -55,7 +58,7 @@ public final class ByteKey implements Comparable<ByteKey>, Serializable { * <p>Makes a copy of the underlying array. */ public static ByteKey copyFrom(byte[] bytes) { - return of(ByteString.copyFrom(bytes)); + return new ByteKey(ByteString.copyFrom(bytes)); } /** @@ -78,12 +81,10 @@ public final class ByteKey implements Comparable<ByteKey>, Serializable { } /** - * Returns an immutable {@link ByteString} representing this {@link ByteKey}. - * - * <p>Does not copy. + * Returns a read-only {@link ByteBuffer} representing this {@link ByteKey}. */ - public ByteString getValue() { - return value; + public ByteBuffer getValue() { + return value.asReadOnlyByteBuffer(); } /** @@ -109,7 +110,7 @@ public final class ByteKey implements Comparable<ByteKey>, Serializable { * size. */ @Override - public int compareTo(ByteKey other) { + public int compareTo(@Nonnull ByteKey other) { checkNotNull(other, "other"); ByteIterator thisIt = value.iterator(); ByteIterator otherIt = other.value.iterator(); http://git-wip-us.apache.org/repos/asf/beam/blob/1efaa1e3/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyRangeTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyRangeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyRangeTest.java index 40f6d8f..dd55970 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyRangeTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyRangeTest.java @@ -26,7 +26,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import com.google.common.collect.ImmutableList; -import com.google.protobuf.ByteString; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; import org.junit.Test; @@ -375,19 +375,19 @@ public class ByteKeyRangeTest { /** Asserts the two keys are equal except trailing zeros. */ private static void assertEqualExceptPadding(ByteKey expected, ByteKey key) { - ByteString shortKey = expected.getValue(); - ByteString longKey = key.getValue(); - if (shortKey.size() > longKey.size()) { + ByteBuffer shortKey = expected.getValue(); + ByteBuffer longKey = key.getValue(); + if (shortKey.remaining() > longKey.remaining()) { shortKey = key.getValue(); longKey = expected.getValue(); } - for (int i = 0; i < shortKey.size(); ++i) { - if (shortKey.byteAt(i) != longKey.byteAt(i)) { + for (int i = 0; i < shortKey.remaining(); ++i) { + if (shortKey.get(i) != longKey.get(i)) { fail(String.format("Expected %s (up to trailing zeros), got %s", expected, key)); } } - for (int j = shortKey.size(); j < longKey.size(); ++j) { - if (longKey.byteAt(j) != 0) { + for (int j = shortKey.remaining(); j < longKey.remaining(); ++j) { + if (longKey.get(j) != 0) { fail(String.format("Expected %s (up to trailing zeros), got %s", expected, key)); } } http://git-wip-us.apache.org/repos/asf/beam/blob/1efaa1e3/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyTest.java index 1117ac7..6ddfa1d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyTest.java @@ -111,7 +111,7 @@ public class ByteKeyTest { assertTrue(String.format("Expected that %s is equal to itself.", left), eq); assertTrue( String.format("Expected that %s is equal to a copy of itself.", left), - left.equals(ByteKey.of(right.getValue()))); + left.equals(ByteKey.copyFrom(right.getValue()))); } else { assertFalse(String.format("Expected that %s is not equal to %s", left, right), eq); } @@ -128,7 +128,7 @@ public class ByteKeyTest { int collisions = 0; for (int i = 0; i < TEST_KEYS.length; ++i) { int left = TEST_KEYS[i].hashCode(); - int leftClone = ByteKey.of(TEST_KEYS[i].getValue()).hashCode(); + int leftClone = ByteKey.copyFrom(TEST_KEYS[i].getValue()).hashCode(); assertEquals( String.format("Expected same hash code for %s and a copy of itself", TEST_KEYS[i]), left, http://git-wip-us.apache.org/repos/asf/beam/blob/1efaa1e3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index 7bba1a6..503be18 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -664,6 +664,10 @@ public class BigtableIO { /** Disallow construction of utility class. */ private BigtableIO() {} + private static ByteKey makeByteKey(ByteString key) { + return ByteKey.copyFrom(key.asReadOnlyByteBuffer()); + } + static class BigtableSource extends BoundedSource<Row> { public BigtableSource( SerializableFunction<PipelineOptions, BigtableService> serviceFactory, @@ -759,7 +763,7 @@ public class BigtableIO { long lastOffset = 0; ImmutableList.Builder<BigtableSource> splits = ImmutableList.builder(); for (SampleRowKeysResponse response : sampleRowKeys) { - ByteKey responseEndKey = ByteKey.of(response.getRowKey()); + ByteKey responseEndKey = makeByteKey(response.getRowKey()); long responseOffset = response.getOffsetBytes(); checkState( responseOffset >= lastOffset, @@ -837,7 +841,7 @@ public class BigtableIO { // TODO: In future, Bigtable service may provide finer grained APIs, e.g., to sample given a // filter or to sample on a given key range. for (SampleRowKeysResponse response : samples) { - ByteKey currentEndKey = ByteKey.of(response.getRowKey()); + ByteKey currentEndKey = makeByteKey(response.getRowKey()); long currentOffset = response.getOffsetBytes(); if (!currentStartKey.isEmpty() && currentStartKey.equals(currentEndKey)) { // Skip an empty region. @@ -950,7 +954,7 @@ public class BigtableIO { reader = service.createReader(getCurrentSource()); boolean hasRecord = reader.start() - && rangeTracker.tryReturnRecordAt(true, ByteKey.of(reader.getCurrentRow().getKey())) + && rangeTracker.tryReturnRecordAt(true, makeByteKey(reader.getCurrentRow().getKey())) || rangeTracker.markDone(); if (hasRecord) { ++recordsReturned; @@ -967,7 +971,7 @@ public class BigtableIO { public boolean advance() throws IOException { boolean hasRecord = reader.advance() - && rangeTracker.tryReturnRecordAt(true, ByteKey.of(reader.getCurrentRow().getKey())) + && rangeTracker.tryReturnRecordAt(true, makeByteKey(reader.getCurrentRow().getKey())) || rangeTracker.markDone(); if (hasRecord) { ++recordsReturned; http://git-wip-us.apache.org/repos/asf/beam/blob/1efaa1e3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java index 1a4937c..90102c8 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java @@ -117,8 +117,8 @@ class BigtableServiceImpl implements BigtableService { public boolean start() throws IOException { RowRange range = RowRange.newBuilder() - .setStartKeyClosed(source.getRange().getStartKey().getValue()) - .setEndKeyOpen(source.getRange().getEndKey().getValue()) + .setStartKeyClosed(ByteString.copyFrom(source.getRange().getStartKey().getValue())) + .setEndKeyOpen(ByteString.copyFrom(source.getRange().getEndKey().getValue())) .build(); RowSet rowSet = RowSet.newBuilder() .addRowRanges(range) http://git-wip-us.apache.org/repos/asf/beam/blob/1efaa1e3/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java index cf96b65..43957e3 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java @@ -148,6 +148,10 @@ public class BigtableIOTest { bigtableCoder = p.getCoderRegistry().getCoder(BIGTABLE_WRITE_TYPE); } + private static ByteKey makeByteKey(ByteString key) { + return ByteKey.copyFrom(key.asReadOnlyByteBuffer()); + } + @Test public void testReadBuildsCorrectly() { BigtableIO.Read read = @@ -328,7 +332,7 @@ public class BigtableIOTest { @Override public boolean apply(@Nullable Row input) { verifyNotNull(input, "input"); - return range.containsKey(ByteKey.of(input.getKey())); + return range.containsKey(makeByteKey(input.getKey())); } })); } @@ -790,7 +794,7 @@ public class BigtableIOTest { public ByteKeyRange getTableRange(String tableId) { verifyTableExists(tableId); SortedMap<ByteString, ByteString> data = tables.get(tableId); - return ByteKeyRange.of(ByteKey.of(data.firstKey()), ByteKey.of(data.lastKey())); + return ByteKeyRange.of(makeByteKey(data.firstKey()), makeByteKey(data.lastKey())); } public void createTable(String tableId) { @@ -890,7 +894,7 @@ public class BigtableIOTest { while (rows.hasNext()) { entry = rows.next(); if (!filter.apply(entry.getKey()) - || !source.getRange().containsKey(ByteKey.of(entry.getKey()))) { + || !source.getRange().containsKey(makeByteKey(entry.getKey()))) { // Does not match row filter or does not match source range. Skip. entry = null; continue; @@ -969,7 +973,7 @@ public class BigtableIOTest { private static final class ByteStringComparator implements Comparator<ByteString>, Serializable { @Override public int compare(ByteString o1, ByteString o2) { - return ByteKey.of(o1).compareTo(ByteKey.of(o2)); + return makeByteKey(o1).compareTo(makeByteKey(o2)); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/1efaa1e3/sdks/java/io/hbase/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/pom.xml b/sdks/java/io/hbase/pom.xml index d8cb95f..0798efc 100644 --- a/sdks/java/io/hbase/pom.xml +++ b/sdks/java/io/hbase/pom.xml @@ -100,11 +100,6 @@ </dependency> <dependency> - <groupId>com.google.protobuf</groupId> - <artifactId>protobuf-java</artifactId> - </dependency> - - <dependency> <groupId>com.google.code.findbugs</groupId> <artifactId>jsr305</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/1efaa1e3/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java index ccdcef6..1c8afbd 100644 --- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java @@ -20,8 +20,6 @@ package org.apache.beam.sdk.io.hbase; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; -import com.google.protobuf.ByteString; - import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -32,9 +30,8 @@ import java.util.NoSuchElementException; import java.util.Set; import java.util.TreeSet; import javax.annotation.Nullable; - import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.coders.ByteStringCoder; +import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; @@ -52,7 +49,6 @@ import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; import org.apache.hadoop.conf.Configuration; - import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLoad; @@ -126,8 +122,8 @@ import org.slf4j.LoggerFactory; * <h3>Writing to HBase</h3> * * <p>The HBase sink executes a set of row mutations on a single table. It takes as input a - * {@link PCollection PCollection<KV<ByteString, Iterable<Mutation>>>}, where the - * {@link ByteString} is the key of the row being mutated, and each {@link Mutation} represents an + * {@link PCollection PCollection<KV<byte[], Iterable<Mutation>>>}, where the + * {@code byte[]} is the key of the row being mutated, and each {@link Mutation} represents an * idempotent transformation to that row. * * <p>To configure a HBase sink, you must supply a table id and a {@link Configuration} @@ -135,7 +131,7 @@ import org.slf4j.LoggerFactory; * * <pre>{@code * Configuration configuration = ...; - * PCollection<KV<ByteString, Iterable<Mutation>>> data = ...; + * PCollection<KV<byte[], Iterable<Mutation>>> data = ...; * data.setCoder(HBaseIO.WRITE_CODER); * * data.apply("write", @@ -549,7 +545,7 @@ public class HBaseIO { * @see HBaseIO */ public static class Write - extends PTransform<PCollection<KV<ByteString, Iterable<Mutation>>>, PDone> { + extends PTransform<PCollection<KV<byte[], Iterable<Mutation>>>, PDone> { /** * Returns a new {@link HBaseIO.Write} that will write to the HBase instance @@ -578,13 +574,13 @@ public class HBaseIO { } @Override - public PDone expand(PCollection<KV<ByteString, Iterable<Mutation>>> input) { + public PDone expand(PCollection<KV<byte[], Iterable<Mutation>>> input) { input.apply(ParDo.of(new HBaseWriterFn(tableId, serializableConfiguration))); return PDone.in(input.getPipeline()); } @Override - public void validate(PCollection<KV<ByteString, Iterable<Mutation>>> input) { + public void validate(PCollection<KV<byte[], Iterable<Mutation>>> input) { checkArgument(serializableConfiguration != null, "Configuration not specified"); checkArgument(!tableId.isEmpty(), "Table ID not specified"); try (Connection connection = ConnectionFactory.createConnection( @@ -616,7 +612,7 @@ public class HBaseIO { private final String tableId; private final SerializableConfiguration serializableConfiguration; - private class HBaseWriterFn extends DoFn<KV<ByteString, Iterable<Mutation>>, Void> { + private class HBaseWriterFn extends DoFn<KV<byte[], Iterable<Mutation>>, Void> { public HBaseWriterFn(String tableId, SerializableConfiguration serializableConfiguration) { @@ -645,7 +641,7 @@ public class HBaseIO { @ProcessElement public void processElement(ProcessContext ctx) throws Exception { - KV<ByteString, Iterable<Mutation>> record = ctx.element(); + KV<byte[], Iterable<Mutation>> record = ctx.element(); List<Mutation> mutations = new ArrayList<>(); for (Mutation mutation : record.getValue()) { mutations.add(mutation); @@ -687,6 +683,6 @@ public class HBaseIO { } } - public static final Coder<KV<ByteString, Iterable<Mutation>>> WRITE_CODER = - KvCoder.of(ByteStringCoder.of(), IterableCoder.of(HBaseMutationCoder.of())); + public static final Coder<KV<byte[], Iterable<Mutation>>> WRITE_CODER = + KvCoder.of(ByteArrayCoder.of(), IterableCoder.of(HBaseMutationCoder.of())); } http://git-wip-us.apache.org/repos/asf/beam/blob/1efaa1e3/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java index c2410ea..bf8cb4b 100644 --- a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java +++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java @@ -24,7 +24,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; -import com.google.protobuf.ByteString; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import org.apache.beam.sdk.Pipeline; @@ -284,7 +284,7 @@ public class HBaseIOTest { public void testWritingFailsTableDoesNotExist() throws Exception { final String table = "TEST-TABLE"; - PCollection<KV<ByteString, Iterable<Mutation>>> emptyInput = + PCollection<KV<byte[], Iterable<Mutation>>> emptyInput = p.apply(Create.empty(HBaseIO.WRITE_CODER)); // Exception will be thrown by write.validate() when write is applied. @@ -380,8 +380,8 @@ public class HBaseIOTest { // Beam helper methods /** Helper function to make a single row mutation to be written. */ - private static KV<ByteString, Iterable<Mutation>> makeWrite(String key, String value) { - ByteString rowKey = ByteString.copyFromUtf8(key); + private static KV<byte[], Iterable<Mutation>> makeWrite(String key, String value) { + byte[] rowKey = key.getBytes(StandardCharsets.UTF_8); List<Mutation> mutations = new ArrayList<>(); mutations.add(makeMutation(key, value)); return KV.of(rowKey, (Iterable<Mutation>) mutations); @@ -389,17 +389,17 @@ public class HBaseIOTest { private static Mutation makeMutation(String key, String value) { - ByteString rowKey = ByteString.copyFromUtf8(key); - return new Put(rowKey.toByteArray()) + byte[] rowKey = key.getBytes(StandardCharsets.UTF_8); + return new Put(rowKey) .addColumn(COLUMN_FAMILY, COLUMN_NAME, Bytes.toBytes(value)) .addColumn(COLUMN_FAMILY, COLUMN_EMAIL, Bytes.toBytes(value + "@email.com")); } - private static KV<ByteString, Iterable<Mutation>> makeBadWrite(String key) { + private static KV<byte[], Iterable<Mutation>> makeBadWrite(String key) { Put put = new Put(key.getBytes()); List<Mutation> mutations = new ArrayList<>(); mutations.add(put); - return KV.of(ByteString.copyFromUtf8(key), (Iterable<Mutation>) mutations); + return KV.of(key.getBytes(StandardCharsets.UTF_8), (Iterable<Mutation>) mutations); } private void runReadTest(HBaseIO.Read read, List<Result> expected) {
