[
https://issues.apache.org/jira/browse/STORM-960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14646073#comment-14646073
]
ASF GitHub Bot commented on STORM-960:
--------------------------------------
Github user harshach commented on a diff in the pull request:
https://github.com/apache/storm/pull/653#discussion_r35761875
--- Diff:
external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java ---
@@ -104,16 +106,30 @@ public void execute(Tuple tuple) {
enableHeartBeatOnAllWriters();
}
writer.write(options.getMapper().mapRecord(tuple));
- currentBatchSize++;
- if(currentBatchSize >= options.getBatchSize()) {
+
+ tupleBatch.add(tuple);
+ if(tupleBatch.size() >= options.getBatchSize()) {
flushAllWriters();
- currentBatchSize = 0;
+ LOG.info("acknowledging tuples after writers flushed ");
+ for(Tuple t : tupleBatch)
+ collector.ack(t);
+ tupleBatch.clear();
}
- collector.ack(tuple);
+
} catch(Exception e) {
this.collector.reportError(e);
collector.fail(tuple);
- flushAndCloseWriters();
+ try {
+ flushAndCloseWriters();
+ LOG.info("acknowledging tuples after writers flushed and
closed");
+ for (Tuple t : tupleBatch)
+ collector.ack(t);
--- End diff --
shouldn't we calling collector.fail here
> Hive-Bolt can lose tuples when flushing data
> --------------------------------------------
>
> Key: STORM-960
> URL: https://issues.apache.org/jira/browse/STORM-960
> Project: Apache Storm
> Issue Type: Improvement
> Components: external
> Reporter: Aaron Dossett
> Assignee: Aaron Dossett
> Priority: Minor
>
> In HiveBolt's execute method tuples are ack'd as they are received. When a
> batchsize of tuples has been received, the writers are flushed. However, if
> the flush fails only the most recent tuple will be marked as failed. All
> prior tuples will already have been ack'd. This creates a window for data
> loss.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)