Repository: storm Updated Branches: refs/heads/master 7ab4f2418 -> 8c4eebf27
STORM-519 adding tuple as an input param to HBaseValueMapper so user can chose to emit fields from original tuple in addition to lookup result. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f87bb313 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f87bb313 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f87bb313 Branch: refs/heads/master Commit: f87bb3135b4dcfd2b73689a35bf0b73a6cc5bc75 Parents: 1babd83 Author: Parth Brahmbhatt <[email protected]> Authored: Mon Oct 6 15:21:28 2014 -0700 Committer: Parth Brahmbhatt <[email protected]> Committed: Mon Oct 6 15:21:28 2014 -0700 ---------------------------------------------------------------------- external/storm-hbase/pom.xml | 2 +- .../main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java | 2 +- .../org/apache/storm/hbase/bolt/mapper/HBaseValueMapper.java | 4 +++- .../java/org/apache/storm/hbase/trident/state/HBaseState.java | 6 ++++-- .../org/apache/storm/hbase/topology/WordCountValueMapper.java | 3 ++- 5 files changed, 11 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/f87bb313/external/storm-hbase/pom.xml ---------------------------------------------------------------------- diff --git a/external/storm-hbase/pom.xml b/external/storm-hbase/pom.xml index f976cea..a492164 100644 --- a/external/storm-hbase/pom.xml +++ b/external/storm-hbase/pom.xml @@ -21,7 +21,7 @@ <parent> <artifactId>storm</artifactId> <groupId>org.apache.storm</groupId> - <version>0.9.3-incubating-SNAPSHOT</version> + <version>0.9.3-SNAPSHOT</version> <relativePath>../../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/storm/blob/f87bb313/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java index 12263a6..c6838be 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java @@ -66,7 +66,7 @@ public class HBaseLookupBolt extends AbstractHBaseBolt { try { Result result = hBaseClient.batchGet(Lists.newArrayList(get))[0]; - for(Values values : rowToTupleMapper.toValues(result)) { + for(Values values : rowToTupleMapper.toValues(tuple, result)) { this.collector.emit(values); } this.collector.ack(tuple); http://git-wip-us.apache.org/repos/asf/storm/blob/f87bb313/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseValueMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseValueMapper.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseValueMapper.java index 39ce47a..bc38b83 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseValueMapper.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseValueMapper.java @@ -18,6 +18,7 @@ package org.apache.storm.hbase.bolt.mapper; import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.ITuple; import backtype.storm.tuple.Values; import org.apache.hadoop.hbase.client.Result; @@ -27,11 +28,12 @@ import java.util.List; public interface HBaseValueMapper extends Serializable { /** * + * @param input tuple. * @param result HBase lookup result instance. * @return list of values that should be emitted by the lookup bolt. * @throws Exception */ - public List<Values> toValues(Result result) throws Exception; + public List<Values> toValues(ITuple input, Result result) throws Exception; /** * declares the output fields for the lookup bolt. http://git-wip-us.apache.org/repos/asf/storm/blob/f87bb313/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java index 66decf2..7b31fad 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java @@ -149,8 +149,10 @@ public class HBaseState implements State { try { Result[] results = hBaseClient.batchGet(gets); - for(Result result : results) { - List<Values> values = options.rowToStormValueMapper.toValues(result); + for(int i = 0; i < results.length; i++) { + Result result = results[i]; + TridentTuple tuple = tridentTuples.get(i); + List<Values> values = options.rowToStormValueMapper.toValues(tuple, result); batchRetrieveResult.add(values); } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/storm/blob/f87bb313/external/storm-hbase/src/test/java/org/apache/storm/hbase/topology/WordCountValueMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-hbase/src/test/java/org/apache/storm/hbase/topology/WordCountValueMapper.java b/external/storm-hbase/src/test/java/org/apache/storm/hbase/topology/WordCountValueMapper.java index dd2ae20..2463085 100644 --- a/external/storm-hbase/src/test/java/org/apache/storm/hbase/topology/WordCountValueMapper.java +++ b/external/storm-hbase/src/test/java/org/apache/storm/hbase/topology/WordCountValueMapper.java @@ -20,6 +20,7 @@ package org.apache.storm.hbase.topology; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; +import backtype.storm.tuple.ITuple; import backtype.storm.tuple.Values; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; @@ -51,7 +52,7 @@ import java.util.List; public class WordCountValueMapper implements HBaseValueMapper { @Override - public List<Values> toValues(Result result) throws Exception { + public List<Values> toValues(ITuple tuple, Result result) throws Exception { List<Values> values = new ArrayList<Values>(); Cell[] cells = result.rawCells(); for(Cell cell : cells) {
