Repository: falcon Updated Branches: refs/heads/master 772e38779 -> c980aa800
FALCON-2173 BacklogEmitterService should emit 0 for process which don⦠â¦ot have backlog Author: Praveen Adlakha <[email protected]> Reviewers: @pallavi-rao Closes #291 from PraveenAdlakha/2173 and squashes the following commits: d53a4e3 [Praveen Adlakha] test case fixed d9e6771 [Praveen Adlakha] else condition changed 5c687bc [Praveen Adlakha] comments addressed 738ad4e [Praveen Adlakha] FALCON-2173 BacklogEmitterService should emit 0 for process which donot have backlog Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/c980aa80 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/c980aa80 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/c980aa80 Branch: refs/heads/master Commit: c980aa800afdcf38fcdafe004586abb3bafd4732 Parents: 772e387 Author: Praveen Adlakha <[email protected]> Authored: Wed Oct 26 09:56:55 2016 +0530 Committer: Pallavi Rao <[email protected]> Committed: Wed Oct 26 09:56:55 2016 +0530 ---------------------------------------------------------------------- .../service/BacklogMetricEmitterService.java | 52 ++++++++++++++------ .../falcon/service/EntitySLAAlertService.java | 2 +- .../service/EntitySLAMonitoringService.java | 4 +- .../java/org/apache/falcon/util/MetricInfo.java | 2 + 4 files changed, 42 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/c980aa80/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java b/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java index 3aa2155..b01b181 100644 --- a/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java +++ b/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java @@ -25,6 +25,7 @@ import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.SchemaHelper; import org.apache.falcon.entity.v0.process.Cluster; +import org.apache.falcon.entity.v0.process.Clusters; import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.jdbc.BacklogMetricStore; import org.apache.falcon.metrics.MetricNotificationService; @@ -103,7 +104,7 @@ public final class BacklogMetricEmitterService implements FalconService, @Override public void onAdd(Entity entity) throws FalconException{ - //DO Nothing + addToBacklog(entity); } @Override @@ -148,12 +149,25 @@ public final class BacklogMetricEmitterService implements FalconService, for(Cluster cluster : process.getClusters().getClusters()){ dropMetric(cluster.getName(), process); } + }else{ + addToBacklog(newEntity); } } @Override public void onReload(Entity entity) throws FalconException{ - // Do Nothing + addToBacklog(entity); + } + + public void addToBacklog(Entity entity) { + if (entity.getEntityType() != EntityType.PROCESS) { + return; + } + Process process = (Process) entity; + if (process.getSla() == null){ + return; + } + entityBacklogs.putIfAbsent(entity, Collections.synchronizedList(new ArrayList<MetricInfo>())); } @Override @@ -306,21 +320,29 @@ public final class BacklogMetricEmitterService implements FalconService, MetricInfo metricInfo = null; HashMap<String, Long> backLogsCluster = new HashMap<>(); synchronized (metrics) { - long currentTime = System.currentTimeMillis(); - Iterator iter = metrics.iterator(); - while (iter.hasNext()) { - try { - metricInfo = (MetricInfo) iter.next(); - long time = DATE_FORMAT.get().parse(metricInfo.getNominalTime()).getTime(); - long backlog = backLogsCluster.containsKey(metricInfo.getCluster()) - ? backLogsCluster.get(metricInfo.getCluster()) : 0; - backlog += (currentTime - time); - backLogsCluster.put(metricInfo.getCluster(), backlog); - } catch (ParseException e) { - LOG.error("Unable to parse nominal time" + metricInfo.getNominalTime()); + if (metrics.isEmpty()){ + Process process = (Process)entityObj; + Clusters clusters = process.getClusters(); + for (Cluster cluster : clusters.getClusters()){ + publishBacklog(process, cluster.getName(), 0L); + } + }else{ + long currentTime = System.currentTimeMillis(); + Iterator iter = metrics.iterator(); + while (iter.hasNext()) { + try { + metricInfo = (MetricInfo) iter.next(); + long time = DATE_FORMAT.get().parse(metricInfo.getNominalTime()).getTime(); + long backlog = backLogsCluster.containsKey(metricInfo.getCluster()) + ? backLogsCluster.get(metricInfo.getCluster()) : 0; + backlog += (currentTime - time); + backLogsCluster.put(metricInfo.getCluster(), backlog); + } catch (ParseException e) { + LOG.error("Unable to parse nominal time" + metricInfo.getNominalTime()); + } } - } + } } org.apache.falcon.entity.v0.process.Process process = (Process) entityObj; http://git-wip-us.apache.org/repos/asf/falcon/blob/c980aa80/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java index c4069dd..837a170 100644 --- a/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java +++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java @@ -163,7 +163,7 @@ public final class EntitySLAAlertService implements FalconService, EntitySLAList ) throws FalconException { for (EntitySLAListener listener : listeners) { listener.highSLAMissed(entityName, clusterName, entityType, nominalTime); - store.deleteEntityAlertInstance(entityName, clusterName, nominalTime, entityType.name()); } + store.deleteEntityAlertInstance(entityName, clusterName, nominalTime, entityType.name()); } } http://git-wip-us.apache.org/repos/asf/falcon/blob/c980aa80/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java index 7ff9309..00e116b 100644 --- a/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java +++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java @@ -418,8 +418,8 @@ public final class EntitySLAMonitoringService implements ConfigurationChangeList return; } for(PendingInstanceBean pendingInstanceBean : MONITORING_JDBC_STATE_STORE.getAllPendingInstances()){ - for (Date instanceTime : MONITORING_JDBC_STATE_STORE.getNominalInstances(pendingInstanceBean.getEntityName(), - pendingInstanceBean.getClusterName(), entityType)) { + for (Date instanceTime : MONITORING_JDBC_STATE_STORE.getNominalInstances( + pendingInstanceBean.getEntityName(), pendingInstanceBean.getClusterName(), entityType)) { boolean status = checkEntityInstanceAvailability(pendingInstanceBean.getEntityName(), pendingInstanceBean.getClusterName(), instanceTime, entityType); if (status) { http://git-wip-us.apache.org/repos/asf/falcon/blob/c980aa80/prism/src/main/java/org/apache/falcon/util/MetricInfo.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/util/MetricInfo.java b/prism/src/main/java/org/apache/falcon/util/MetricInfo.java index 694bb87..59c2bfd 100644 --- a/prism/src/main/java/org/apache/falcon/util/MetricInfo.java +++ b/prism/src/main/java/org/apache/falcon/util/MetricInfo.java @@ -25,6 +25,8 @@ public class MetricInfo { private String nominalTime; private String cluster; + public MetricInfo(){} + public MetricInfo(String nominalTimeStr, String clusterName) { this.nominalTime = nominalTimeStr; this.cluster = clusterName;
