Repository: beam Updated Branches: refs/heads/master 2304972c5 -> eae0d05bd
[BEAM-2411] Make the write transform of HBaseIO simpler Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1ec59a08 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1ec59a08 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1ec59a08 Branch: refs/heads/master Commit: 1ec59a08a3fab5ac0918d7f1a33b82427957b630 Parents: 2304972 Author: Ismaël MejÃa <[email protected]> Authored: Mon Jun 5 23:48:38 2017 +0200 Committer: Ismaël MejÃa <[email protected]> Committed: Tue Jun 20 09:04:00 2017 +0200 ---------------------------------------------------------------------- .../org/apache/beam/sdk/io/hbase/HBaseIO.java | 45 +++++++------------- .../apache/beam/sdk/io/hbase/HBaseIOTest.java | 37 +++++++--------- 2 files changed, 32 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/1ec59a08/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java index 849873c..626fad9 100644 --- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java @@ -31,10 +31,7 @@ import java.util.Set; import java.util.TreeSet; import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.IterableCoder; -import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.hadoop.SerializableConfiguration; import org.apache.beam.sdk.io.range.ByteKey; @@ -44,7 +41,6 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; @@ -122,16 +118,15 @@ import org.slf4j.LoggerFactory; * <h3>Writing to HBase</h3> * * <p>The HBase sink executes a set of row mutations on a single table. It takes as input a - * {@link PCollection PCollection<KV<byte[], Iterable<Mutation>>>}, where the - * {@code byte[]} is the key of the row being mutated, and each {@link Mutation} represents an - * idempotent transformation to that row. + * {@link PCollection PCollection<Mutation>}, where each {@link Mutation} represents an + * idempotent transformation on a row. * * <p>To configure a HBase sink, you must supply a table id and a {@link Configuration} * to identify the HBase instance, for example: * * <pre>{@code * Configuration configuration = ...; - * PCollection<KV<byte[], Iterable<Mutation>>> data = ...; + * PCollection<Mutation> data = ...; * data.setCoder(HBaseIO.WRITE_CODER); * * data.apply("write", @@ -545,9 +540,7 @@ public class HBaseIO { * * @see HBaseIO */ - public static class Write - extends PTransform<PCollection<KV<byte[], Iterable<Mutation>>>, PDone> { - + public static class Write extends PTransform<PCollection<Mutation>, PDone> { /** * Returns a new {@link HBaseIO.Write} that will write to the HBase instance * indicated by the given Configuration, and using any other specified customizations. @@ -575,7 +568,7 @@ public class HBaseIO { } @Override - public PDone expand(PCollection<KV<byte[], Iterable<Mutation>>> input) { + public PDone expand(PCollection<Mutation> input) { input.apply(ParDo.of(new HBaseWriterFn(tableId, serializableConfiguration))); return PDone.in(input.getPipeline()); } @@ -613,7 +606,7 @@ public class HBaseIO { private final String tableId; private final SerializableConfiguration serializableConfiguration; - private class HBaseWriterFn extends DoFn<KV<byte[], Iterable<Mutation>>, Void> { + private class HBaseWriterFn extends DoFn<Mutation, Void> { public HBaseWriterFn(String tableId, SerializableConfiguration serializableConfiguration) { @@ -624,31 +617,27 @@ public class HBaseIO { @Setup public void setup() throws Exception { - Configuration configuration = this.serializableConfiguration.get(); - connection = ConnectionFactory.createConnection(configuration); + connection = ConnectionFactory.createConnection(serializableConfiguration.get()); + } - TableName tableName = TableName.valueOf(tableId); + @StartBundle + public void startBundle(StartBundleContext c) throws IOException { BufferedMutatorParams params = - new BufferedMutatorParams(tableName); + new BufferedMutatorParams(TableName.valueOf(tableId)); mutator = connection.getBufferedMutator(params); - recordsWritten = 0; } @ProcessElement - public void processElement(ProcessContext ctx) throws Exception { - KV<byte[], Iterable<Mutation>> record = ctx.element(); - List<Mutation> mutations = new ArrayList<>(); - for (Mutation mutation : record.getValue()) { - mutations.add(mutation); - ++recordsWritten; - } - mutator.mutate(mutations); + public void processElement(ProcessContext c) throws Exception { + mutator.mutate(c.element()); + ++recordsWritten; } @FinishBundle public void finishBundle() throws Exception { mutator.flush(); + LOG.debug("Wrote {} records", recordsWritten); } @Teardown @@ -661,7 +650,6 @@ public class HBaseIO { connection.close(); connection = null; } - LOG.debug("Wrote {} records", recordsWritten); } @Override @@ -679,6 +667,5 @@ public class HBaseIO { } } - public static final Coder<KV<byte[], Iterable<Mutation>>> WRITE_CODER = - KvCoder.of(ByteArrayCoder.of(), IterableCoder.of(HBaseMutationCoder.of())); + public static final Coder<Mutation> WRITE_CODER = HBaseMutationCoder.of(); } http://git-wip-us.apache.org/repos/asf/beam/blob/1ec59a08/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java index 005770d..d081139 100644 --- a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java +++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java @@ -38,7 +38,6 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -292,15 +291,17 @@ public class HBaseIOTest { final String table = "table"; final String key = "key"; final String value = "value"; + final int numMutations = 100; createTable(table); - p.apply("single row", Create.of(makeWrite(key, value)).withCoder(HBaseIO.WRITE_CODER)) - .apply("write", HBaseIO.write().withConfiguration(conf).withTableId(table)); + p.apply("multiple rows", Create.of(makeMutations(key, value, numMutations)) + .withCoder(HBaseIO.WRITE_CODER)) + .apply("write", HBaseIO.write().withConfiguration(conf).withTableId(table)); p.run().waitUntilFinish(); List<Result> results = readTable(table, new Scan()); - assertEquals(1, results.size()); + assertEquals(numMutations, results.size()); } /** Tests that when writing to a non-existent table, the write fails. */ @@ -308,10 +309,8 @@ public class HBaseIOTest { public void testWritingFailsTableDoesNotExist() throws Exception { final String table = "TEST-TABLE-DOES-NOT-EXIST"; - PCollection<KV<byte[], Iterable<Mutation>>> emptyInput = - p.apply(Create.empty(HBaseIO.WRITE_CODER)); - - emptyInput.apply("write", HBaseIO.write().withConfiguration(conf).withTableId(table)); + p.apply(Create.empty(HBaseIO.WRITE_CODER)) + .apply("write", HBaseIO.write().withConfiguration(conf).withTableId(table)); // Exception will be thrown by write.validate() when write is applied. thrown.expect(IllegalArgumentException.class); @@ -326,7 +325,7 @@ public class HBaseIOTest { final String key = "KEY"; createTable(table); - p.apply(Create.of(makeBadWrite(key)).withCoder(HBaseIO.WRITE_CODER)) + p.apply(Create.of(makeBadMutation(key)).withCoder(HBaseIO.WRITE_CODER)) .apply(HBaseIO.write().withConfiguration(conf).withTableId(table)); thrown.expect(Pipeline.PipelineExecutionException.class); @@ -405,26 +404,22 @@ public class HBaseIOTest { // Beam helper methods /** Helper function to make a single row mutation to be written. */ - private static KV<byte[], Iterable<Mutation>> makeWrite(String key, String value) { - byte[] rowKey = key.getBytes(StandardCharsets.UTF_8); + private static Iterable<Mutation> makeMutations(String key, String value, int numMutations) { List<Mutation> mutations = new ArrayList<>(); - mutations.add(makeMutation(key, value)); - return KV.of(rowKey, (Iterable<Mutation>) mutations); + for (int i = 0; i < numMutations; i++) { + mutations.add(makeMutation(key + i, value)); + } + return mutations; } - private static Mutation makeMutation(String key, String value) { - byte[] rowKey = key.getBytes(StandardCharsets.UTF_8); - return new Put(rowKey) + return new Put(key.getBytes(StandardCharsets.UTF_8)) .addColumn(COLUMN_FAMILY, COLUMN_NAME, Bytes.toBytes(value)) .addColumn(COLUMN_FAMILY, COLUMN_EMAIL, Bytes.toBytes(value + "@email.com")); } - private static KV<byte[], Iterable<Mutation>> makeBadWrite(String key) { - Put put = new Put(key.getBytes()); - List<Mutation> mutations = new ArrayList<>(); - mutations.add(put); - return KV.of(key.getBytes(StandardCharsets.UTF_8), (Iterable<Mutation>) mutations); + private static Mutation makeBadMutation(String key) { + return new Put(key.getBytes()); } private void runReadTest(HBaseIO.Read read, List<Result> expected) {
