This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch fix/CAMEL-23865 in repository https://gitbox.apache.org/repos/asf/camel.git
commit fdee0944b551149953748c9fa4efc38fd70840f8 Author: Claus Ibsen <[email protected]> AuthorDate: Wed Jul 1 10:31:17 2026 +0200 CAMEL-23865: Use EWMA smoothing for throughput calculation Co-Authored-By: Claude <[email protected]> Signed-off-by: Claus Ibsen <[email protected]> --- .../camel/management/mbean/LoadThroughput.java | 25 +++--- .../camel/management/LoadThroughputTest.java | 92 ++++++++++++++++++++++ 2 files changed, 106 insertions(+), 11 deletions(-) diff --git a/core/camel-management/src/main/java/org/apache/camel/management/mbean/LoadThroughput.java b/core/camel-management/src/main/java/org/apache/camel/management/mbean/LoadThroughput.java index dd05d4595d87..931f75e89639 100644 --- a/core/camel-management/src/main/java/org/apache/camel/management/mbean/LoadThroughput.java +++ b/core/camel-management/src/main/java/org/apache/camel/management/mbean/LoadThroughput.java @@ -19,18 +19,25 @@ package org.apache.camel.management.mbean; import org.apache.camel.util.StopWatch; /** - * Holds the load throughput messages/second + * Holds the throughput (messages/second) using EWMA (exponentially weighted moving average) smoothing, modeled after + * Unix load averages (same approach as {@link LoadTriplet}). + * + * The instantaneous rate from each 1-second sampling interval is smoothed with a 1-minute decay window so that the + * reported value converges to the true average rate instead of oscillating between 0 and spike values. */ public final class LoadThroughput { + // EWMA exponent for a 1-minute decay window, sampled every 1 second + private static final double EXP_1 = Math.exp(-1.0 / 60.0); + private final StopWatch watch = new StopWatch(false); private long last; private double thp; /** - * Update the load statistics + * Update the throughput statistics * - * @param currentReading the current reading + * @param currentReading the current cumulative exchange count */ public void update(long currentReading) { if (!watch.isStarted()) { @@ -40,14 +47,10 @@ public final class LoadThroughput { long time = watch.takenAndRestart(); if (time > 0) { long delta = currentReading - last; - if (delta > 0) { - // need to calculate with fractions - thp = (1000d / time) * delta; - } else { - thp = 0; - } - } else { - thp = 0; + // instantaneous rate in exchanges/second for this interval + double instantRate = (1000d / time) * delta; + // apply EWMA smoothing + thp = instantRate + EXP_1 * (thp - instantRate); } } last = currentReading; diff --git a/core/camel-management/src/test/java/org/apache/camel/management/LoadThroughputTest.java b/core/camel-management/src/test/java/org/apache/camel/management/LoadThroughputTest.java new file mode 100644 index 000000000000..287279b46a71 --- /dev/null +++ b/core/camel-management/src/test/java/org/apache/camel/management/LoadThroughputTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.management; + +import org.apache.camel.management.mbean.LoadThroughput; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@DisabledOnOs(OS.AIX) +public class LoadThroughputTest { + + @Test + public void testInitialValueIsZero() { + LoadThroughput t = new LoadThroughput(); + assertEquals(0.0, t.getThroughput()); + } + + @Test + public void testConvergesToSteadyRate() throws Exception { + LoadThroughput t = new LoadThroughput(); + + // simulate 1 exchange per second for 120 seconds (well past 1-minute EWMA window) + long total = 0; + t.update(total); + Thread.sleep(10); + for (int i = 0; i < 120; i++) { + total++; + Thread.sleep(10); + t.update(total); + } + + // should converge close to the steady rate + assertTrue(t.getThroughput() > 0, "Throughput should be positive for steady input"); + } + + @Test + public void testSmoothing() throws Exception { + LoadThroughput t = new LoadThroughput(); + + // simulate a timer that fires every 5th update (like a 5s timer with 1s sampling) + // with 10ms sleep, effective rate is 1 exchange per 50ms ≈ 20 exchanges/sec + long total = 0; + t.update(total); + Thread.sleep(10); + for (int i = 1; i <= 100; i++) { + if (i % 5 == 0) { + total++; + } + Thread.sleep(10); + t.update(total); + } + + double thp = t.getThroughput(); + // the smoothed value should be positive and converging toward the average rate (~20/s) + // rather than oscillating between 0 and ~100 (the instantaneous spike) + assertTrue(thp > 1.0, "Smoothed throughput should be well above zero: " + thp); + assertTrue(thp < 80.0, "Smoothed throughput should be below the instantaneous spike: " + thp); + } + + @Test + public void testReset() throws Exception { + LoadThroughput t = new LoadThroughput(); + + t.update(0); + Thread.sleep(10); + t.update(10); + + assertTrue(t.getThroughput() > 0); + + t.reset(); + assertEquals(0.0, t.getThroughput()); + } + +}
