STREAMS-197 | add util function and refactored
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/0006a044 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/0006a044 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/0006a044 Branch: refs/heads/master Commit: 0006a044f001a68ba8ceb481ff4fbcb0ba0de270 Parents: f40ce9b Author: Ryan Ebanks <[email protected]> Authored: Tue Oct 21 17:26:35 2014 -0500 Committer: Ryan Ebanks <[email protected]> Committed: Tue Oct 21 17:26:35 2014 -0500 ---------------------------------------------------------------------- streams-runtimes/streams-runtime-local/pom.xml | 6 +++--- .../local/counters/DatumStatusCounter.java | 15 +++++---------- .../local/counters/StreamsTaskCounter.java | 10 ++-------- .../org/apache/streams/util/ComponentUtils.java | 18 ++++++++++++++++++ 4 files changed, 28 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0006a044/streams-runtimes/streams-runtime-local/pom.xml ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/pom.xml b/streams-runtimes/streams-runtime-local/pom.xml index 0d9138d..9ee8d5b 100644 --- a/streams-runtimes/streams-runtime-local/pom.xml +++ b/streams-runtimes/streams-runtime-local/pom.xml @@ -46,17 +46,17 @@ <dependency> <groupId>org.apache.streams</groupId> <artifactId>streams-core</artifactId> - <version>0.1-SNAPSHOT</version> + <version>${project.version}</version> </dependency> <dependency> <groupId>org.apache.streams</groupId> <artifactId>streams-util</artifactId> - <version>0.1-SNAPSHOT</version> + <version>${project.version}</version> </dependency> <dependency> <groupId>org.apache.streams</groupId> <artifactId>streams-pojo</artifactId> - <version>0.1-SNAPSHOT</version> + <version>${project.version}</version> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0006a044/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java index 88c3a6f..acada71 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java @@ -18,6 +18,7 @@ package org.apache.streams.local.counters; import net.jcip.annotations.ThreadSafe; +import org.apache.streams.util.ComponentUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,14 +41,7 @@ public class DatumStatusCounter implements DatumStatusCounterMXBean{ public DatumStatusCounter(String id) { this.failed = new AtomicLong(0); this.passed = new AtomicLong(0); - try { - ObjectName name = new ObjectName(String.format(NAME_TEMPLATE, id)); - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - mbs.registerMBean(this, name); - } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException e) { - LOGGER.error("Failed to register MXBean : {}", e); - throw new RuntimeException(e); - } + ComponentUtils.registerLocalMBean(String.format(NAME_TEMPLATE, id), this); } public void incrementFailedCount() { @@ -70,10 +64,11 @@ public class DatumStatusCounter implements DatumStatusCounterMXBean{ @Override public double getFailRate() { double failed = this.failed.get(); - if(failed == 0.0 && this.passed.get() == 0) { + double passed = this.passed.get(); + if(failed == 0.0 && passed == 0) { return 0.0; } - return failed / (this.passed.get() + failed); + return failed / (passed + failed); } @Override http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0006a044/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java index 8801df2..68c6364 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java @@ -19,6 +19,7 @@ package org.apache.streams.local.counters; import net.jcip.annotations.GuardedBy; import net.jcip.annotations.ThreadSafe; +import org.apache.streams.util.ComponentUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,14 +53,7 @@ public class StreamsTaskCounter implements StreamsTaskCounterMXBean{ this.errors = new AtomicLong(0); this.totalTime = new AtomicLong(0); this.maxTime = -1; - try { - ObjectName name = new ObjectName(String.format(NAME_TEMPLATE, id)); - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - mbs.registerMBean(this, name); - } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException e) { - LOGGER.error("Failed to register MXBean : {}", e); - throw new RuntimeException(e); - } + ComponentUtils.registerLocalMBean(String.format(NAME_TEMPLATE, id), this); } /** http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0006a044/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java ---------------------------------------------------------------------- diff --git a/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java b/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java index 9f3c480..bb65c4c 100644 --- a/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java +++ b/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java @@ -22,6 +22,8 @@ import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.management.*; +import java.lang.management.ManagementFactory; import java.util.Queue; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -105,4 +107,20 @@ public class ComponentUtils { } } + /** + * Attempts to register an object with local MBeanServer. Throws runtime exception on errors. + * @param name name to register bean with + * @param mbean mbean to register + */ + public static <V> void registerLocalMBean(String name, V mbean) { + try { + ObjectName objectName = new ObjectName(name); + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + mbs.registerMBean(mbean, objectName); + } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException e) { + LOGGER.error("Failed to register MXBean : {}", e); + throw new RuntimeException(e); + } + } + }
