Repository: incubator-streams
Updated Branches:
  refs/heads/master 849a2e8f9 -> 9b3227141


Created MXBean for counts and depricated DatumStatusCounter and 
DatumStatusCountable in core


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/c030d439
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/c030d439
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/c030d439

Branch: refs/heads/master
Commit: c030d4393ab84fbd1cf484bf8ea06a7902a26964
Parents: 2a635df
Author: Ryan Ebanks <[email protected]>
Authored: Fri Oct 17 17:13:51 2014 -0500
Committer: Ryan Ebanks <[email protected]>
Committed: Fri Oct 17 17:13:51 2014 -0500

----------------------------------------------------------------------
 .../streams/core/DatumStatusCountable.java      |   1 +
 .../apache/streams/core/DatumStatusCounter.java |   2 +-
 .../local/counters/DatumStatusCounter.java      |  88 +++++++++++
 .../counters/DatumStatusCounterMXBean.java      |  43 +++++
 .../local/counters/StreamsTaskCounter.java      | 101 ++++++++++++
 .../counters/StreamsTaskCounterMXBean.java      |  53 +++++++
 ...amOnUnhandleThrowableThreadPoolExecutor.java |  17 ++
 .../local/counters/DatumStatusCounterTest.java  | 134 ++++++++++++++++
 .../local/counters/StreamsTaskCounterTest.java  | 158 +++++++++++++++++++
 9 files changed, 596 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c030d439/streams-core/src/main/java/org/apache/streams/core/DatumStatusCountable.java
----------------------------------------------------------------------
diff --git 
a/streams-core/src/main/java/org/apache/streams/core/DatumStatusCountable.java 
b/streams-core/src/main/java/org/apache/streams/core/DatumStatusCountable.java
index 6a9da4d..3abf30d 100644
--- 
a/streams-core/src/main/java/org/apache/streams/core/DatumStatusCountable.java
+++ 
b/streams-core/src/main/java/org/apache/streams/core/DatumStatusCountable.java
@@ -21,6 +21,7 @@ package org.apache.streams.core;
 /**
  * Created by steveblackmon on 3/24/14.
  */
+@Deprecated
 public interface DatumStatusCountable {
 
     public DatumStatusCounter getDatumStatusCounter();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c030d439/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java
----------------------------------------------------------------------
diff --git 
a/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java 
b/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java
index 1bba688..2758ada 100644
--- a/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java
+++ b/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java
@@ -17,7 +17,7 @@
  */
 
 package org.apache.streams.core;
-
+@Deprecated
 public class DatumStatusCounter
 {
     private volatile int attempted = 0;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c030d439/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java
 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java
new file mode 100644
index 0000000..88c3a6f
--- /dev/null
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.streams.local.counters;
+
+import net.jcip.annotations.ThreadSafe;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.management.*;
+import java.lang.management.ManagementFactory;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ *
+ */
+@ThreadSafe
+public class DatumStatusCounter implements DatumStatusCounterMXBean{
+
+    public static final String NAME_TEMPLATE = 
"org.apache.streams.local:type=DatumCounter,name=%s";
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(DatumStatusCounter.class);
+
+    private AtomicLong failed;
+    private AtomicLong passed;
+
+    public DatumStatusCounter(String id) {
+        this.failed = new AtomicLong(0);
+        this.passed = new AtomicLong(0);
+        try {
+            ObjectName name = new ObjectName(String.format(NAME_TEMPLATE, id));
+            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+            mbs.registerMBean(this, name);
+        } catch (MalformedObjectNameException | InstanceAlreadyExistsException 
| MBeanRegistrationException | NotCompliantMBeanException e) {
+            LOGGER.error("Failed to register MXBean : {}", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void incrementFailedCount() {
+        this.incrementFailedCount(1);
+    }
+
+    public void incrementFailedCount(long delta) {
+        this.failed.addAndGet(delta);
+    }
+
+    public void incrementPassedCount() {
+        this.incrementPassedCount(1);
+    }
+
+    public void incrementPassedCount(long delta) {
+        this.passed.addAndGet(delta);
+    }
+
+
+    @Override
+    public double getFailRate() {
+        double failed = this.failed.get();
+        if(failed == 0.0 && this.passed.get() == 0) {
+            return 0.0;
+        }
+        return failed / (this.passed.get() + failed);
+    }
+
+    @Override
+    public long getNumFailed() {
+        return this.failed.get();
+    }
+
+    @Override
+    public long getNumPassed() {
+        return this.passed.get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c030d439/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounterMXBean.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounterMXBean.java
 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounterMXBean.java
new file mode 100644
index 0000000..7cc8df4
--- /dev/null
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounterMXBean.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.streams.local.counters;
+
+/**
+ *
+ */
+public interface DatumStatusCounterMXBean {
+
+    /**
+     * Get number of failed datums
+     * @return number of failed datums
+     */
+    public long getNumFailed();
+
+    /**
+     * Get number of passed datums
+     * @return number of passed datums
+     */
+    public long getNumPassed();
+
+    /**
+     * Get the failure rate.  Calculated by num failed divided by (num passed 
+ num failed)
+     * @return the failure rate
+     */
+    public double getFailRate();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c030d439/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
new file mode 100644
index 0000000..ffd9f25
--- /dev/null
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.streams.local.counters;
+
+import net.jcip.annotations.ThreadSafe;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.management.*;
+import java.lang.management.ManagementFactory;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ *
+ */
+@ThreadSafe
+public class StreamsTaskCounter implements StreamsTaskCounterMXBean{
+
+    public static final String NAME_TEMPLATE = 
"org.apache.streams.local:type=DatumCounter,name=%s";
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(StreamsTaskCounter.class);
+
+    private AtomicLong emitted;
+    private AtomicLong received;
+    private AtomicLong errors;
+
+    public StreamsTaskCounter(String id) {
+        this.emitted = new AtomicLong(0);
+        this.received = new AtomicLong(0);
+        this.errors = new AtomicLong(0);
+        try {
+            ObjectName name = new ObjectName(String.format(NAME_TEMPLATE, id));
+            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+            mbs.registerMBean(this, name);
+        } catch (MalformedObjectNameException | InstanceAlreadyExistsException 
| MBeanRegistrationException | NotCompliantMBeanException e) {
+            LOGGER.error("Failed to register MXBean : {}", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void incrementEmittedCount() {
+        this.incrementEmittedCount(1);
+    }
+
+    public void incrementEmittedCount(long delta) {
+        this.emitted.addAndGet(delta);
+    }
+
+    public void incrementErrorCount() {
+        this.incrementErrorCount(1);
+    }
+
+    public void incrementErrorCount(long delta) {
+        this.errors.addAndGet(delta);
+    }
+
+    public void incrementReceivedCount() {
+        this.incrementReceivedCount(1);
+    }
+
+    public void incrementReceivedCount(long delta) {
+        this.received.addAndGet(delta);
+    }
+
+    @Override
+    public double getErrorRate() {
+        if(this.received.get() == 0) {
+            return 0.0;
+        }
+        return (double) this.errors.get() / (double) this.received.get();
+    }
+
+    @Override
+    public long getNumEmitted() {
+        return this.emitted.get();
+    }
+
+    @Override
+    public long getNumReceived() {
+        return this.received.get();
+    }
+
+    @Override
+    public long getNumUnhandledErrors() {
+        return this.errors.get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c030d439/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java
 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java
new file mode 100644
index 0000000..634857d
--- /dev/null
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.streams.local.counters;
+
+/**
+ *
+ */
+public interface StreamsTaskCounterMXBean {
+
+    /**
+     * Get the error rate of the streams process calculated by the number of 
errors not handled by the {@link org.apache.streams.local.tasks.StreamsTask}
+     * divided by the number of datums received.
+     * @return error rate
+     */
+    public double getErrorRate();
+
+    /**
+     * Get the number of {@link org.apache.streams.core.StreamsDatum}s emitted 
by the streams process
+     * @return number of emitted datums
+     */
+    public long getNumEmitted();
+
+    /**
+     * Get the number of {@link org.apache.streams.core.StreamsDatum}s 
received by the streams process
+     * @return number of received datums
+     */
+    public long getNumReceived();
+
+    /**
+     * Get the number of errors that the process had to catch because the 
executing Provider/Processor/Writer did not
+     * catch and handle the exception
+     * @return number of handled errors
+     */
+    public long getNumUnhandledErrors();
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c030d439/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
index 55a26e1..ea65ac2 100644
--- 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.streams.local.executors;
 
 import org.apache.streams.local.builders.LocalStreamBuilder;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c030d439/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/DatumStatusCounterTest.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/DatumStatusCounterTest.java
 
b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/DatumStatusCounterTest.java
new file mode 100644
index 0000000..3a9a8dc
--- /dev/null
+++ 
b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/DatumStatusCounterTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.streams.local.counters;
+
+import com.carrotsearch.randomizedtesting.RandomizedTest;
+import com.carrotsearch.randomizedtesting.annotations.Repeat;
+import org.junit.After;
+import org.junit.Test;
+
+import javax.management.InstanceNotFoundException;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+
+/**
+ *
+ */
+public class DatumStatusCounterTest extends RandomizedTest {
+
+    private static final String MBEAN_ID = "test_id";
+
+
+
+    /**
+     * Remove registered mbeans from previous tests
+     * @throws Exception
+     */
+    @After
+    public void unregisterMXBean() throws Exception {
+        try {
+            ManagementFactory.getPlatformMBeanServer().unregisterMBean(new 
ObjectName(String.format(DatumStatusCounter.NAME_TEMPLATE, MBEAN_ID)));
+        } catch (InstanceNotFoundException ife) {
+            //No-op
+        }
+    }
+
+    /**
+     * Test Constructor can register the counter as an mxbean with throwing an 
exception.
+     */
+    @Test
+    public void testConstructor() {
+        try {
+            new DatumStatusCounter(MBEAN_ID);
+        } catch (Throwable t) {
+            fail("Constructor Threw Exception : "+t.getMessage());
+        }
+    }
+
+    /**
+     * Test that you can increment passes and it returns the correct count
+     * @throws Exception
+     */
+    @Test
+    @Repeat(iterations = 3)
+    public void testPassed() throws Exception {
+        DatumStatusCounter counter = new DatumStatusCounter(MBEAN_ID);
+        int numIncrements = randomIntBetween(1, 100000);
+        for(int i=0; i < numIncrements; ++i) {
+            counter.incrementPassedCount();
+        }
+        assertEquals(numIncrements, counter.getNumPassed());
+
+        unregisterMXBean();
+
+        counter = new DatumStatusCounter(MBEAN_ID);
+        numIncrements = randomIntBetween(1, 100000);
+        long total = 0;
+        for(int i=0; i < numIncrements; ++i) {
+            long delta = randomIntBetween(1, 100);
+            total += delta;
+            counter.incrementPassedCount(delta);
+        }
+        assertEquals(total, counter.getNumPassed());
+    }
+
+    /**
+     * Test that you can increment failed and it returns the correct count
+     * @throws Exception
+     */
+    @Test
+    @Repeat(iterations = 3)
+    public void testFailed() throws Exception {
+        DatumStatusCounter counter = new DatumStatusCounter(MBEAN_ID);
+        int numIncrements = randomIntBetween(1, 100000);
+        for(int i=0; i < numIncrements; ++i) {
+            counter.incrementFailedCount();
+        }
+        assertEquals(numIncrements, counter.getNumFailed());
+
+        unregisterMXBean();
+
+        counter = new DatumStatusCounter(MBEAN_ID);
+        numIncrements = randomIntBetween(1, 100000);
+        long total = 0;
+        for(int i=0; i < numIncrements; ++i) {
+            long delta = randomIntBetween(1, 100);
+            total += delta;
+            counter.incrementFailedCount(delta);
+        }
+        assertEquals(total, counter.getNumFailed());
+    }
+
+
+    /**
+     * Test failure rate returns expected values
+     */
+    @Test
+    @Repeat(iterations = 3)
+    public void testFailureRate() {
+        DatumStatusCounter counter = new DatumStatusCounter(MBEAN_ID);
+        assertEquals(0.0, counter.getFailRate(), 0);
+        int failures = randomIntBetween(0, 100000);
+        int passes = randomIntBetween(0, 100000);
+        counter.incrementPassedCount(passes);
+        counter.incrementFailedCount(failures);
+        assertEquals((double)failures / (double)(passes + failures), 
counter.getFailRate(), 0);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c030d439/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/StreamsTaskCounterTest.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/StreamsTaskCounterTest.java
 
b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/StreamsTaskCounterTest.java
new file mode 100644
index 0000000..a001845
--- /dev/null
+++ 
b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/StreamsTaskCounterTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.streams.local.counters;
+
+import com.carrotsearch.randomizedtesting.RandomizedTest;
+import com.carrotsearch.randomizedtesting.annotations.Repeat;
+import org.junit.After;
+import org.junit.Test;
+
+import javax.management.InstanceNotFoundException;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+
+/**
+ * Unit tests for {@link org.apache.streams.local.counters.StreamsTaskCounter}
+ */
+public class StreamsTaskCounterTest extends RandomizedTest {
+
+    private static final String MBEAN_ID = "test_id";
+
+    /**
+     * Remove registered mbeans from previous tests
+     * @throws Exception
+     */
+    @After
+    public void unregisterMXBean() throws Exception {
+        try {
+            ManagementFactory.getPlatformMBeanServer().unregisterMBean(new 
ObjectName(String.format(StreamsTaskCounter.NAME_TEMPLATE, MBEAN_ID)));
+        } catch (InstanceNotFoundException ife) {
+            //No-op
+        }
+    }
+
+    /**
+     * Test constructor does not throw errors
+     */
+    @Test
+    public void testConstructor() {
+        try {
+            new StreamsTaskCounter(MBEAN_ID);
+        } catch (Throwable t) {
+            fail("Constructor threw error : "+t.getMessage());
+        }
+    }
+
+    /**
+     * Test emitted increments correctly and returns expected value
+     * @throws Exception
+     */
+    @Test
+    @Repeat(iterations = 3)
+    public void testEmitted() throws Exception {
+        StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID);
+        int numIncrements = randomIntBetween(1, 100000);
+        for(int i=0; i < numIncrements; ++i) {
+            counter.incrementEmittedCount();
+        }
+        assertEquals(numIncrements, counter.getNumEmitted());
+
+        unregisterMXBean();
+
+        counter = new StreamsTaskCounter(MBEAN_ID);
+        numIncrements = randomIntBetween(1, 100000);
+        long total = 0;
+        for(int i=0; i < numIncrements; ++i) {
+            long delta = randomIntBetween(1, 100);
+            total += delta;
+            counter.incrementEmittedCount(delta);
+        }
+        assertEquals(total, counter.getNumEmitted());
+    }
+
+    /**
+     * Test received increments correctly and returns expected value
+     * @throws Exception
+     */
+    @Test
+    @Repeat(iterations = 3)
+    public void testReceived() throws Exception {
+        StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID);
+        int numIncrements = randomIntBetween(1, 100000);
+        for(int i=0; i < numIncrements; ++i) {
+            counter.incrementReceivedCount();
+        }
+        assertEquals(numIncrements, counter.getNumReceived());
+
+        unregisterMXBean();
+
+        counter = new StreamsTaskCounter(MBEAN_ID);
+        numIncrements = randomIntBetween(1, 100000);
+        long total = 0;
+        for(int i=0; i < numIncrements; ++i) {
+            long delta = randomIntBetween(1, 100);
+            total += delta;
+            counter.incrementReceivedCount(delta);
+        }
+        assertEquals(total, counter.getNumReceived());
+    }
+
+    /**
+     * Test errors increments correctly and returns expected value
+     * @throws Exception
+     */
+    @Test
+    @Repeat(iterations = 3)
+    public void testError() throws Exception {
+        StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID);
+        int numIncrements = randomIntBetween(1, 100000);
+        for(int i=0; i < numIncrements; ++i) {
+            counter.incrementErrorCount();
+        }
+        assertEquals(numIncrements, counter.getNumUnhandledErrors());
+
+        unregisterMXBean();
+
+        counter = new StreamsTaskCounter(MBEAN_ID);
+        numIncrements = randomIntBetween(1, 100000);
+        long total = 0;
+        for(int i=0; i < numIncrements; ++i) {
+            long delta = randomIntBetween(1, 100);
+            total += delta;
+            counter.incrementErrorCount(delta);
+        }
+        assertEquals(total, counter.getNumUnhandledErrors());
+    }
+
+    /**
+     * Test error rate returns expected value
+     * @throws Exception
+     */
+    @Test
+    @Repeat(iterations = 3)
+    public void testErrorRate() throws Exception {
+        StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID);
+        assertEquals(0.0, counter.getErrorRate(), 0);
+        int failures = randomIntBetween(0, 100000);
+        int received = randomIntBetween(0, 100000);
+        counter.incrementReceivedCount(received);
+        counter.incrementErrorCount(failures);
+        assertEquals((double)failures / (double)(received), 
counter.getErrorRate(), 0);
+    }
+
+}

Reply via email to