Repository: eagle Updated Branches: refs/heads/master 2160673c8 -> e38c19e13
[EAGLE-869] Fix MetricStreamPersist bug: no tuple pass on when batchSize > 1 https://issues.apache.org/jira/browse/EAGLE-869 Author: Hao Chen <[email protected]> Closes #779 from haoch/FixMetricStreamPersist. Project: http://git-wip-us.apache.org/repos/asf/eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/e38c19e1 Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/e38c19e1 Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/e38c19e1 Branch: refs/heads/master Commit: e38c19e135ec649b0816fa016e24800ba0b4a7d6 Parents: 2160673 Author: Hao Chen <[email protected]> Authored: Mon Jan 16 17:07:03 2017 +0800 Committer: Hao Chen <[email protected]> Committed: Mon Jan 16 17:07:03 2017 +0800 ---------------------------------------------------------------------- .../app/messaging/MetricStreamPersist.java | 24 +++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/eagle/blob/e38c19e1/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java index d656827..ba99911 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java @@ -41,7 +41,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; -public class MetricStreamPersist extends BaseRichBolt { +public class MetricStreamPersist extends BaseRichBolt { private static final Logger LOG = LoggerFactory.getLogger(MetricStreamPersist.class); public static final String METRIC_NAME_FIELD = "metricName"; @@ -75,24 +75,26 @@ public class MetricStreamPersist extends BaseRichBolt { @Override public void execute(Tuple input) { + GenericMetricEntity metricEntity = null; try { - GenericMetricEntity metricEntity = this.mapper.map(StreamConvertHelper.tupleToEvent(input).f1()); + metricEntity = this.mapper.map(StreamConvertHelper.tupleToEvent(input).f1()); if (batchSize <= 1) { GenericServiceAPIResponseEntity<String> response = this.client.create(Collections.singletonList(metricEntity)); if (!response.isSuccess()) { LOG.error("Service side error: {}", response.getException()); collector.reportError(new IllegalStateException(response.getException())); - } else { - collector.emit(Collections.singletonList(metricEntity.getPrefix())); - collector.ack(input); } } else { this.batchSender.send(metricEntity); - collector.ack(input); } } catch (Exception ex) { LOG.error(ex.getMessage(), ex); collector.reportError(ex); + } finally { + if (metricEntity != null) { + collector.emit(Collections.singletonList(metricEntity.getPrefix())); + } + collector.ack(input); } } @@ -127,12 +129,12 @@ public class MetricStreamPersist extends BaseRichBolt { @Override public GenericMetricEntity map(Map event) { String metricName = metricDefinition.getNameSelector().getMetricName(event); - Preconditions.checkNotNull(metricName,"Metric name is null"); + Preconditions.checkNotNull(metricName, "Metric name is null"); Long timestamp = metricDefinition.getTimestampSelector().getTimestamp(event); Preconditions.checkNotNull(timestamp, "Timestamp is null"); - Map<String,String> tags = new HashMap<>(); - for (String dimensionField: metricDefinition.getDimensionFields()) { - Preconditions.checkNotNull(dimensionField,"Dimension field name is null"); + Map<String, String> tags = new HashMap<>(); + for (String dimensionField : metricDefinition.getDimensionFields()) { + Preconditions.checkNotNull(dimensionField, "Dimension field name is null"); tags.put(dimensionField, (String) event.get(dimensionField)); } @@ -141,7 +143,7 @@ public class MetricStreamPersist extends BaseRichBolt { values = new double[] {(double) event.get(metricDefinition.getValueField())}; } else { LOG.warn("Event has no value field '{}': {}, use 0 by default", metricDefinition.getValueField(), event); - values = new double[]{0}; + values = new double[] {0}; } GenericMetricEntity entity = new GenericMetricEntity();
