[BEAM-1540] Small refactorings for HBaseIO Rename result to current + make some attributes final Refactor SerializableScan to use get()
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/16a2bc0e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/16a2bc0e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/16a2bc0e Branch: refs/heads/master Commit: 16a2bc0ef860b02f5a8da01b0bd808973310c206 Parents: 715b95a Author: Ismaël MejÃa <[email protected]> Authored: Thu Feb 23 10:23:04 2017 +0100 Committer: Dan Halperin <[email protected]> Committed: Mon Feb 27 13:13:03 2017 -0800 ---------------------------------------------------------------------- sdks/java/io/hbase/pom.xml | 5 ++- .../org/apache/beam/sdk/io/hbase/HBaseIO.java | 32 ++++++++++---------- .../beam/sdk/io/hbase/SerializableScan.java | 10 ++++-- .../apache/beam/sdk/io/hbase/HBaseIOTest.java | 2 +- 4 files changed, 27 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/16a2bc0e/sdks/java/io/hbase/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/pom.xml b/sdks/java/io/hbase/pom.xml index f4a06a9..dfcca7a 100644 --- a/sdks/java/io/hbase/pom.xml +++ b/sdks/java/io/hbase/pom.xml @@ -196,9 +196,8 @@ </dependency> <dependency> - <groupId>commons-lang</groupId> - <artifactId>commons-lang</artifactId> - <version>2.6</version> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> <scope>test</scope> </dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/16a2bc0e/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java index 3c49db6..ed191cb 100644 --- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java @@ -217,7 +217,7 @@ public class HBaseIO { */ public Read withFilter(Filter filter) { checkNotNull(filter, "filter"); - return withScan(serializableScan.getScan().setFilter(filter)); + return withScan(serializableScan.get().setFilter(filter)); } /** @@ -229,7 +229,7 @@ public class HBaseIO { checkNotNull(keyRange, "keyRange"); byte[] startRow = keyRange.getStartKey().getBytes(); byte[] stopRow = keyRange.getEndKey().getBytes(); - return withScan(serializableScan.getScan().setStartRow(startRow).setStopRow(stopRow)); + return withScan(serializableScan.get().setStartRow(startRow).setStopRow(stopRow)); } /** @@ -279,7 +279,7 @@ public class HBaseIO { builder.add(DisplayData.item("configuration", serializableConfiguration.get().toString())); builder.add(DisplayData.item("tableId", tableId)); - builder.addIfNotNull(DisplayData.item("scan", serializableScan.getScan().toString())); + builder.addIfNotNull(DisplayData.item("scan", serializableScan.get().toString())); } public String getTableId() { @@ -294,18 +294,18 @@ public class HBaseIO { * Returns the range of keys that will be read from the table. */ public ByteKeyRange getKeyRange() { - byte[] startRow = serializableScan.getScan().getStartRow(); - byte[] stopRow = serializableScan.getScan().getStopRow(); + byte[] startRow = serializableScan.get().getStartRow(); + byte[] stopRow = serializableScan.get().getStopRow(); return ByteKeyRange.of(ByteKey.copyFrom(startRow), ByteKey.copyFrom(stopRow)); } - private SerializableConfiguration serializableConfiguration; - private String tableId; - private SerializableScan serializableScan; + private final SerializableConfiguration serializableConfiguration; + private final String tableId; + private final SerializableScan serializableScan; } static class HBaseSource extends BoundedSource<Result> { - private Read read; + private final Read read; @Nullable private Long estimatedSizeBytes; HBaseSource(Read read, @Nullable Long estimatedSizeBytes) { @@ -318,7 +318,7 @@ public class HBaseIO { if (estimatedSizeBytes == null) { estimatedSizeBytes = estimateSizeBytes(); LOG.debug("Estimated size {} bytes for table {} and scan {}", estimatedSizeBytes, - read.tableId, read.serializableScan.getScan()); + read.tableId, read.serializableScan.get()); } return estimatedSizeBytes; } @@ -360,7 +360,7 @@ public class HBaseIO { } private List<HRegionLocation> getRegionLocations(Connection connection) throws Exception { - final Scan scan = read.serializableScan.getScan(); + final Scan scan = read.serializableScan.get(); byte[] startRow = scan.getStartRow(); byte[] stopRow = scan.getStopRow(); @@ -390,7 +390,7 @@ public class HBaseIO { private List<HBaseSource> splitBasedOnRegions(List<HRegionLocation> regionLocations, int numSplits) throws Exception { - final Scan scan = read.serializableScan.getScan(); + final Scan scan = read.serializableScan.get(); byte[] startRow = scan.getStartRow(); byte[] stopRow = scan.getStopRow(); @@ -478,7 +478,7 @@ public class HBaseIO { private Connection connection; private ResultScanner scanner; private Iterator<Result> iter; - private Result result; + private Result current; private long recordsReturned; HBaseReader(HBaseSource source) { @@ -492,7 +492,7 @@ public class HBaseIO { connection = ConnectionFactory.createConnection(configuration); TableName tableName = TableName.valueOf(tableId); Table table = connection.getTable(tableName); - Scan scan = source.read.serializableScan.getScan(); + Scan scan = source.read.serializableScan.get(); scanner = table.getScanner(scan); iter = scanner.iterator(); return advance(); @@ -500,14 +500,14 @@ public class HBaseIO { @Override public Result getCurrent() throws NoSuchElementException { - return result; + return current; } @Override public boolean advance() throws IOException { boolean hasRecord = iter.hasNext(); if (hasRecord) { - result = iter.next(); + current = iter.next(); ++recordsReturned; } return hasRecord; http://git-wip-us.apache.org/repos/asf/beam/blob/16a2bc0e/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/SerializableScan.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/SerializableScan.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/SerializableScan.java index df575b0..f3bc7ac 100644 --- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/SerializableScan.java +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/SerializableScan.java @@ -26,12 +26,18 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; /** - * This is just a wrapper class to serialize HBase {@link Scan}. + * This is just a wrapper class to serialize HBase {@link Scan} using Protobuf. */ class SerializableScan implements Serializable { private transient Scan scan; + public SerializableScan() { + } + public SerializableScan(Scan scan) { + if (scan == null) { + throw new NullPointerException("Scan must not be null."); + } this.scan = scan; } @@ -43,7 +49,7 @@ class SerializableScan implements Serializable { scan = ProtobufUtil.toScan(ClientProtos.Scan.parseDelimitedFrom(in)); } - public Scan getScan() { + public Scan get() { return scan; } } http://git-wip-us.apache.org/repos/asf/beam/blob/16a2bc0e/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java index 1d49f9d..774e17e 100644 --- a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java +++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java @@ -40,7 +40,7 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; -import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility;
