[
https://issues.apache.org/jira/browse/STORM-1079?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14942104#comment-14942104
]
ASF GitHub Bot commented on STORM-1079:
---------------------------------------
Github user Parth-Brahmbhatt commented on a diff in the pull request:
https://github.com/apache/storm/pull/772#discussion_r41083829
--- Diff:
external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java
---
@@ -53,21 +61,62 @@ public HBaseBolt withConfigKey(String configKey) {
return this;
}
+ public HBaseBolt withBatchSize(int batchSize) {
+ this.batchSize = batchSize;
+ return this;
+ }
+
+ public HBaseBolt withFlushIntervalSecs(int flushIntervalSecs) {
+ this.flushIntervalSecs = flushIntervalSecs;
+ return this;
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ Map<String, Object> conf = super.getComponentConfiguration();
+ if (conf == null)
+ conf = new Config();
+
+ if (flushIntervalSecs > 0) {
+ LOG.info("Enabling tick tuple with interval [" +
flushIntervalSecs + "]");
+ conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,
flushIntervalSecs);
+ }
+
+ return conf;
+ }
+
+
@Override
public void execute(Tuple tuple) {
- byte[] rowKey = this.mapper.rowKey(tuple);
- ColumnList cols = this.mapper.columns(tuple);
- List<Mutation> mutations =
hBaseClient.constructMutationReq(rowKey, cols, writeToWAL? Durability.SYNC_WAL
: Durability.SKIP_WAL);
+ boolean flush = false;
+ if (TupleUtils.isTick(tuple)) {
+ LOG.debug("TICK received! current batch status [" +
tupleBatch.size() + "/" + batchSize + "]");
+ flush = true;
+ } else {
+ byte[] rowKey = this.mapper.rowKey(tuple);
+ ColumnList cols = this.mapper.columns(tuple);
+ List<Mutation> mutations =
hBaseClient.constructMutationReq(rowKey, cols, writeToWAL? Durability.SYNC_WAL
: Durability.SKIP_WAL);
+ batchMutations.addAll(mutations);
+ tupleBatch.add(tuple);
+ if (tupleBatch.size() >= batchSize)
--- End diff --
I can vouch for existence of such a community :-).
> Batch Puts to HBase
> -------------------
>
> Key: STORM-1079
> URL: https://issues.apache.org/jira/browse/STORM-1079
> Project: Apache Storm
> Issue Type: Improvement
> Components: storm-hbase
> Reporter: Sriharsha Chintalapani
> Assignee: Sriharsha Chintalapani
> Fix For: 0.11.0
>
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)