Github user nickwallen commented on a diff in the pull request:
https://github.com/apache/metron/pull/1239#discussion_r226369605
--- Diff:
metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
---
@@ -118,12 +118,15 @@ public void commit(BulkWriterResponse response) {
public void error(String sensorType, Throwable e, Iterable<Tuple>
tuples, MessageGetStrategy messageGetStrategy) {
LOG.error(format("Failing %d tuple(s); sensorType=%s",
Iterables.size(tuples), sensorType), e);
- MetronError error = new MetronError()
- .withSensorType(Collections.singleton(sensorType))
- .withErrorType(Constants.ErrorType.INDEXING_ERROR)
- .withThrowable(e);
- tuples.forEach(t -> error.addRawMessage(messageGetStrategy.get(t)));
- handleError(tuples, error);
+ tuples.forEach(t -> {
--- End diff --
Seems like this will ack the same tuples repetitively. If 500 messages in
a batch fail, then we will ack all 500 of them, 500 times.
There is also the nuisance that we report the error to the collector for
each and every failed message, instead of just once for the batch. There is
only one `Throwable` error to report, so we should just report it once.
We may need something like this.
```suggestion
// emit one error for each failed message
tuples.forEach(t -> {
MetronError error = new MetronError()
.withSensorType(Collections.singleton(sensorType))
.withErrorType(Constants.ErrorType.INDEXING_ERROR)
.withThrowable(e)
.addRawMessage(messageGetStrategy.get(t));
collector.emit(Constants.ERROR_STREAM, new
Values(error.getJSONObject()));
collector.ack(t);
});
// there is only one error to report for all of the failed tuples
collector.reportError(e);
}
```
---