Repository: incubator-streams Updated Branches: refs/heads/master 849a2e8f9 -> 9b3227141
Created MXBean for counts and depricated DatumStatusCounter and DatumStatusCountable in core Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/c030d439 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/c030d439 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/c030d439 Branch: refs/heads/master Commit: c030d4393ab84fbd1cf484bf8ea06a7902a26964 Parents: 2a635df Author: Ryan Ebanks <[email protected]> Authored: Fri Oct 17 17:13:51 2014 -0500 Committer: Ryan Ebanks <[email protected]> Committed: Fri Oct 17 17:13:51 2014 -0500 ---------------------------------------------------------------------- .../streams/core/DatumStatusCountable.java | 1 + .../apache/streams/core/DatumStatusCounter.java | 2 +- .../local/counters/DatumStatusCounter.java | 88 +++++++++++ .../counters/DatumStatusCounterMXBean.java | 43 +++++ .../local/counters/StreamsTaskCounter.java | 101 ++++++++++++ .../counters/StreamsTaskCounterMXBean.java | 53 +++++++ ...amOnUnhandleThrowableThreadPoolExecutor.java | 17 ++ .../local/counters/DatumStatusCounterTest.java | 134 ++++++++++++++++ .../local/counters/StreamsTaskCounterTest.java | 158 +++++++++++++++++++ 9 files changed, 596 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c030d439/streams-core/src/main/java/org/apache/streams/core/DatumStatusCountable.java ---------------------------------------------------------------------- diff --git a/streams-core/src/main/java/org/apache/streams/core/DatumStatusCountable.java b/streams-core/src/main/java/org/apache/streams/core/DatumStatusCountable.java index 6a9da4d..3abf30d 100644 --- a/streams-core/src/main/java/org/apache/streams/core/DatumStatusCountable.java +++ b/streams-core/src/main/java/org/apache/streams/core/DatumStatusCountable.java @@ -21,6 +21,7 @@ package org.apache.streams.core; /** * Created by steveblackmon on 3/24/14. */ +@Deprecated public interface DatumStatusCountable { public DatumStatusCounter getDatumStatusCounter(); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c030d439/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java ---------------------------------------------------------------------- diff --git a/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java b/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java index 1bba688..2758ada 100644 --- a/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java +++ b/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java @@ -17,7 +17,7 @@ */ package org.apache.streams.core; - +@Deprecated public class DatumStatusCounter { private volatile int attempted = 0; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c030d439/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java new file mode 100644 index 0000000..88c3a6f --- /dev/null +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounter.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.streams.local.counters; + +import net.jcip.annotations.ThreadSafe; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.management.*; +import java.lang.management.ManagementFactory; +import java.util.concurrent.atomic.AtomicLong; + +/** + * + */ +@ThreadSafe +public class DatumStatusCounter implements DatumStatusCounterMXBean{ + + public static final String NAME_TEMPLATE = "org.apache.streams.local:type=DatumCounter,name=%s"; + private static final Logger LOGGER = LoggerFactory.getLogger(DatumStatusCounter.class); + + private AtomicLong failed; + private AtomicLong passed; + + public DatumStatusCounter(String id) { + this.failed = new AtomicLong(0); + this.passed = new AtomicLong(0); + try { + ObjectName name = new ObjectName(String.format(NAME_TEMPLATE, id)); + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + mbs.registerMBean(this, name); + } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException e) { + LOGGER.error("Failed to register MXBean : {}", e); + throw new RuntimeException(e); + } + } + + public void incrementFailedCount() { + this.incrementFailedCount(1); + } + + public void incrementFailedCount(long delta) { + this.failed.addAndGet(delta); + } + + public void incrementPassedCount() { + this.incrementPassedCount(1); + } + + public void incrementPassedCount(long delta) { + this.passed.addAndGet(delta); + } + + + @Override + public double getFailRate() { + double failed = this.failed.get(); + if(failed == 0.0 && this.passed.get() == 0) { + return 0.0; + } + return failed / (this.passed.get() + failed); + } + + @Override + public long getNumFailed() { + return this.failed.get(); + } + + @Override + public long getNumPassed() { + return this.passed.get(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c030d439/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounterMXBean.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounterMXBean.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounterMXBean.java new file mode 100644 index 0000000..7cc8df4 --- /dev/null +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/DatumStatusCounterMXBean.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.streams.local.counters; + +/** + * + */ +public interface DatumStatusCounterMXBean { + + /** + * Get number of failed datums + * @return number of failed datums + */ + public long getNumFailed(); + + /** + * Get number of passed datums + * @return number of passed datums + */ + public long getNumPassed(); + + /** + * Get the failure rate. Calculated by num failed divided by (num passed + num failed) + * @return the failure rate + */ + public double getFailRate(); + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c030d439/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java new file mode 100644 index 0000000..ffd9f25 --- /dev/null +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounter.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.streams.local.counters; + +import net.jcip.annotations.ThreadSafe; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.management.*; +import java.lang.management.ManagementFactory; +import java.util.concurrent.atomic.AtomicLong; + +/** + * + */ +@ThreadSafe +public class StreamsTaskCounter implements StreamsTaskCounterMXBean{ + + public static final String NAME_TEMPLATE = "org.apache.streams.local:type=DatumCounter,name=%s"; + private static final Logger LOGGER = LoggerFactory.getLogger(StreamsTaskCounter.class); + + private AtomicLong emitted; + private AtomicLong received; + private AtomicLong errors; + + public StreamsTaskCounter(String id) { + this.emitted = new AtomicLong(0); + this.received = new AtomicLong(0); + this.errors = new AtomicLong(0); + try { + ObjectName name = new ObjectName(String.format(NAME_TEMPLATE, id)); + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + mbs.registerMBean(this, name); + } catch (MalformedObjectNameException | InstanceAlreadyExistsException | MBeanRegistrationException | NotCompliantMBeanException e) { + LOGGER.error("Failed to register MXBean : {}", e); + throw new RuntimeException(e); + } + } + + public void incrementEmittedCount() { + this.incrementEmittedCount(1); + } + + public void incrementEmittedCount(long delta) { + this.emitted.addAndGet(delta); + } + + public void incrementErrorCount() { + this.incrementErrorCount(1); + } + + public void incrementErrorCount(long delta) { + this.errors.addAndGet(delta); + } + + public void incrementReceivedCount() { + this.incrementReceivedCount(1); + } + + public void incrementReceivedCount(long delta) { + this.received.addAndGet(delta); + } + + @Override + public double getErrorRate() { + if(this.received.get() == 0) { + return 0.0; + } + return (double) this.errors.get() / (double) this.received.get(); + } + + @Override + public long getNumEmitted() { + return this.emitted.get(); + } + + @Override + public long getNumReceived() { + return this.received.get(); + } + + @Override + public long getNumUnhandledErrors() { + return this.errors.get(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c030d439/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java new file mode 100644 index 0000000..634857d --- /dev/null +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/counters/StreamsTaskCounterMXBean.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.streams.local.counters; + +/** + * + */ +public interface StreamsTaskCounterMXBean { + + /** + * Get the error rate of the streams process calculated by the number of errors not handled by the {@link org.apache.streams.local.tasks.StreamsTask} + * divided by the number of datums received. + * @return error rate + */ + public double getErrorRate(); + + /** + * Get the number of {@link org.apache.streams.core.StreamsDatum}s emitted by the streams process + * @return number of emitted datums + */ + public long getNumEmitted(); + + /** + * Get the number of {@link org.apache.streams.core.StreamsDatum}s received by the streams process + * @return number of received datums + */ + public long getNumReceived(); + + /** + * Get the number of errors that the process had to catch because the executing Provider/Processor/Writer did not + * catch and handle the exception + * @return number of handled errors + */ + public long getNumUnhandledErrors(); + + + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c030d439/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java index 55a26e1..ea65ac2 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.streams.local.executors; import org.apache.streams.local.builders.LocalStreamBuilder; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c030d439/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/DatumStatusCounterTest.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/DatumStatusCounterTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/DatumStatusCounterTest.java new file mode 100644 index 0000000..3a9a8dc --- /dev/null +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/DatumStatusCounterTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.streams.local.counters; + +import com.carrotsearch.randomizedtesting.RandomizedTest; +import com.carrotsearch.randomizedtesting.annotations.Repeat; +import org.junit.After; +import org.junit.Test; + +import javax.management.InstanceNotFoundException; +import javax.management.ObjectName; +import java.lang.management.ManagementFactory; + +/** + * + */ +public class DatumStatusCounterTest extends RandomizedTest { + + private static final String MBEAN_ID = "test_id"; + + + + /** + * Remove registered mbeans from previous tests + * @throws Exception + */ + @After + public void unregisterMXBean() throws Exception { + try { + ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(String.format(DatumStatusCounter.NAME_TEMPLATE, MBEAN_ID))); + } catch (InstanceNotFoundException ife) { + //No-op + } + } + + /** + * Test Constructor can register the counter as an mxbean with throwing an exception. + */ + @Test + public void testConstructor() { + try { + new DatumStatusCounter(MBEAN_ID); + } catch (Throwable t) { + fail("Constructor Threw Exception : "+t.getMessage()); + } + } + + /** + * Test that you can increment passes and it returns the correct count + * @throws Exception + */ + @Test + @Repeat(iterations = 3) + public void testPassed() throws Exception { + DatumStatusCounter counter = new DatumStatusCounter(MBEAN_ID); + int numIncrements = randomIntBetween(1, 100000); + for(int i=0; i < numIncrements; ++i) { + counter.incrementPassedCount(); + } + assertEquals(numIncrements, counter.getNumPassed()); + + unregisterMXBean(); + + counter = new DatumStatusCounter(MBEAN_ID); + numIncrements = randomIntBetween(1, 100000); + long total = 0; + for(int i=0; i < numIncrements; ++i) { + long delta = randomIntBetween(1, 100); + total += delta; + counter.incrementPassedCount(delta); + } + assertEquals(total, counter.getNumPassed()); + } + + /** + * Test that you can increment failed and it returns the correct count + * @throws Exception + */ + @Test + @Repeat(iterations = 3) + public void testFailed() throws Exception { + DatumStatusCounter counter = new DatumStatusCounter(MBEAN_ID); + int numIncrements = randomIntBetween(1, 100000); + for(int i=0; i < numIncrements; ++i) { + counter.incrementFailedCount(); + } + assertEquals(numIncrements, counter.getNumFailed()); + + unregisterMXBean(); + + counter = new DatumStatusCounter(MBEAN_ID); + numIncrements = randomIntBetween(1, 100000); + long total = 0; + for(int i=0; i < numIncrements; ++i) { + long delta = randomIntBetween(1, 100); + total += delta; + counter.incrementFailedCount(delta); + } + assertEquals(total, counter.getNumFailed()); + } + + + /** + * Test failure rate returns expected values + */ + @Test + @Repeat(iterations = 3) + public void testFailureRate() { + DatumStatusCounter counter = new DatumStatusCounter(MBEAN_ID); + assertEquals(0.0, counter.getFailRate(), 0); + int failures = randomIntBetween(0, 100000); + int passes = randomIntBetween(0, 100000); + counter.incrementPassedCount(passes); + counter.incrementFailedCount(failures); + assertEquals((double)failures / (double)(passes + failures), counter.getFailRate(), 0); + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c030d439/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/StreamsTaskCounterTest.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/StreamsTaskCounterTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/StreamsTaskCounterTest.java new file mode 100644 index 0000000..a001845 --- /dev/null +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/StreamsTaskCounterTest.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.streams.local.counters; + +import com.carrotsearch.randomizedtesting.RandomizedTest; +import com.carrotsearch.randomizedtesting.annotations.Repeat; +import org.junit.After; +import org.junit.Test; + +import javax.management.InstanceNotFoundException; +import javax.management.ObjectName; +import java.lang.management.ManagementFactory; + +/** + * Unit tests for {@link org.apache.streams.local.counters.StreamsTaskCounter} + */ +public class StreamsTaskCounterTest extends RandomizedTest { + + private static final String MBEAN_ID = "test_id"; + + /** + * Remove registered mbeans from previous tests + * @throws Exception + */ + @After + public void unregisterMXBean() throws Exception { + try { + ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(String.format(StreamsTaskCounter.NAME_TEMPLATE, MBEAN_ID))); + } catch (InstanceNotFoundException ife) { + //No-op + } + } + + /** + * Test constructor does not throw errors + */ + @Test + public void testConstructor() { + try { + new StreamsTaskCounter(MBEAN_ID); + } catch (Throwable t) { + fail("Constructor threw error : "+t.getMessage()); + } + } + + /** + * Test emitted increments correctly and returns expected value + * @throws Exception + */ + @Test + @Repeat(iterations = 3) + public void testEmitted() throws Exception { + StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID); + int numIncrements = randomIntBetween(1, 100000); + for(int i=0; i < numIncrements; ++i) { + counter.incrementEmittedCount(); + } + assertEquals(numIncrements, counter.getNumEmitted()); + + unregisterMXBean(); + + counter = new StreamsTaskCounter(MBEAN_ID); + numIncrements = randomIntBetween(1, 100000); + long total = 0; + for(int i=0; i < numIncrements; ++i) { + long delta = randomIntBetween(1, 100); + total += delta; + counter.incrementEmittedCount(delta); + } + assertEquals(total, counter.getNumEmitted()); + } + + /** + * Test received increments correctly and returns expected value + * @throws Exception + */ + @Test + @Repeat(iterations = 3) + public void testReceived() throws Exception { + StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID); + int numIncrements = randomIntBetween(1, 100000); + for(int i=0; i < numIncrements; ++i) { + counter.incrementReceivedCount(); + } + assertEquals(numIncrements, counter.getNumReceived()); + + unregisterMXBean(); + + counter = new StreamsTaskCounter(MBEAN_ID); + numIncrements = randomIntBetween(1, 100000); + long total = 0; + for(int i=0; i < numIncrements; ++i) { + long delta = randomIntBetween(1, 100); + total += delta; + counter.incrementReceivedCount(delta); + } + assertEquals(total, counter.getNumReceived()); + } + + /** + * Test errors increments correctly and returns expected value + * @throws Exception + */ + @Test + @Repeat(iterations = 3) + public void testError() throws Exception { + StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID); + int numIncrements = randomIntBetween(1, 100000); + for(int i=0; i < numIncrements; ++i) { + counter.incrementErrorCount(); + } + assertEquals(numIncrements, counter.getNumUnhandledErrors()); + + unregisterMXBean(); + + counter = new StreamsTaskCounter(MBEAN_ID); + numIncrements = randomIntBetween(1, 100000); + long total = 0; + for(int i=0; i < numIncrements; ++i) { + long delta = randomIntBetween(1, 100); + total += delta; + counter.incrementErrorCount(delta); + } + assertEquals(total, counter.getNumUnhandledErrors()); + } + + /** + * Test error rate returns expected value + * @throws Exception + */ + @Test + @Repeat(iterations = 3) + public void testErrorRate() throws Exception { + StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID); + assertEquals(0.0, counter.getErrorRate(), 0); + int failures = randomIntBetween(0, 100000); + int received = randomIntBetween(0, 100000); + counter.incrementReceivedCount(received); + counter.incrementErrorCount(failures); + assertEquals((double)failures / (double)(received), counter.getErrorRate(), 0); + } + +}
