Repository: eagle
Updated Branches:
  refs/heads/master 2160673c8 -> e38c19e13


[EAGLE-869] Fix MetricStreamPersist bug: no tuple pass on when batchSize > 1

https://issues.apache.org/jira/browse/EAGLE-869

Author: Hao Chen <[email protected]>

Closes #779 from haoch/FixMetricStreamPersist.


Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/e38c19e1
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/e38c19e1
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/e38c19e1

Branch: refs/heads/master
Commit: e38c19e135ec649b0816fa016e24800ba0b4a7d6
Parents: 2160673
Author: Hao Chen <[email protected]>
Authored: Mon Jan 16 17:07:03 2017 +0800
Committer: Hao Chen <[email protected]>
Committed: Mon Jan 16 17:07:03 2017 +0800

----------------------------------------------------------------------
 .../app/messaging/MetricStreamPersist.java      | 24 +++++++++++---------
 1 file changed, 13 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/eagle/blob/e38c19e1/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java
 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java
index d656827..ba99911 100644
--- 
a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java
+++ 
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java
@@ -41,7 +41,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
-public class MetricStreamPersist extends BaseRichBolt  {
+public class MetricStreamPersist extends BaseRichBolt {
     private static final Logger LOG = 
LoggerFactory.getLogger(MetricStreamPersist.class);
     public static final String METRIC_NAME_FIELD = "metricName";
 
@@ -75,24 +75,26 @@ public class MetricStreamPersist extends BaseRichBolt  {
 
     @Override
     public void execute(Tuple input) {
+        GenericMetricEntity metricEntity = null;
         try {
-            GenericMetricEntity metricEntity = 
this.mapper.map(StreamConvertHelper.tupleToEvent(input).f1());
+            metricEntity = 
this.mapper.map(StreamConvertHelper.tupleToEvent(input).f1());
             if (batchSize <= 1) {
                 GenericServiceAPIResponseEntity<String> response = 
this.client.create(Collections.singletonList(metricEntity));
                 if (!response.isSuccess()) {
                     LOG.error("Service side error: {}", 
response.getException());
                     collector.reportError(new 
IllegalStateException(response.getException()));
-                } else {
-                    
collector.emit(Collections.singletonList(metricEntity.getPrefix()));
-                    collector.ack(input);
                 }
             } else {
                 this.batchSender.send(metricEntity);
-                collector.ack(input);
             }
         } catch (Exception ex) {
             LOG.error(ex.getMessage(), ex);
             collector.reportError(ex);
+        } finally {
+            if (metricEntity != null) {
+                
collector.emit(Collections.singletonList(metricEntity.getPrefix()));
+            }
+            collector.ack(input);
         }
     }
 
@@ -127,12 +129,12 @@ public class MetricStreamPersist extends BaseRichBolt  {
         @Override
         public GenericMetricEntity map(Map event) {
             String metricName = 
metricDefinition.getNameSelector().getMetricName(event);
-            Preconditions.checkNotNull(metricName,"Metric name is null");
+            Preconditions.checkNotNull(metricName, "Metric name is null");
             Long timestamp = 
metricDefinition.getTimestampSelector().getTimestamp(event);
             Preconditions.checkNotNull(timestamp, "Timestamp is null");
-            Map<String,String> tags = new HashMap<>();
-            for (String dimensionField: metricDefinition.getDimensionFields()) 
{
-                Preconditions.checkNotNull(dimensionField,"Dimension field 
name is null");
+            Map<String, String> tags = new HashMap<>();
+            for (String dimensionField : 
metricDefinition.getDimensionFields()) {
+                Preconditions.checkNotNull(dimensionField, "Dimension field 
name is null");
                 tags.put(dimensionField, (String) event.get(dimensionField));
             }
 
@@ -141,7 +143,7 @@ public class MetricStreamPersist extends BaseRichBolt  {
                 values = new double[] {(double) 
event.get(metricDefinition.getValueField())};
             } else {
                 LOG.warn("Event has no value field '{}': {}, use 0 by 
default", metricDefinition.getValueField(), event);
-                values = new double[]{0};
+                values = new double[] {0};
             }
 
             GenericMetricEntity entity = new GenericMetricEntity();

Reply via email to