rzo1 commented on code in PR #8586:
URL: https://github.com/apache/storm/pull/8586#discussion_r3201137167


##########
conf/defaults.yaml:
##########
@@ -278,6 +278,7 @@ topology.max.task.parallelism: null
 topology.max.spout.pending: null    # ideally should be larger than 
topology.producer.batch.size. (esp. if topology.batch.flush.interval.millis=0)
 topology.state.synchronization.timeout.secs: 60
 topology.stats.sample.rate: 0.05
+topology.stats.ewma.enable: false

Review Comment:
   `topology.stats.ewma.smoothing_factor` isn't represented here even though 
it's a tunable. Either add `topology.stats.ewma.smoothing.factor: 0.0625` (or 
`null` with a comment about the `RFC1889_ALPHA` fallback) so operators can 
discover the knob via `defaults.yaml`, or call out the default in the 
`Config.java` Javadoc — right now the `1/16` default is invisible outside the 
source.



##########
storm-client/src/jvm/org/apache/storm/Config.java:
##########
@@ -596,6 +596,20 @@ public class Config extends HashMap<String, Object> {
      */
     @IsPositiveNumber
     public static final String TOPOLOGY_STATS_SAMPLE_RATE = 
"topology.stats.sample.rate";
+    /**
+     * Enabling jitter streaming calculation (RFC 1889a).
+     *
+     * @see <a href="https://www.rfc-editor.org/rfc/rfc1889#appendix-A.8";>RFC 
1889 Appendix A.8</a>
+     */
+    @IsBoolean
+    public static final String TOPOLOGY_STATS_EWMA_ENABLE = 
"topology.stats.ewma.enable";
+    /**
+     * The smoothing factor (alpha) used for exponential jitter calculation 
(RFC 1889a).
+     *
+     * @see <a href="https://www.rfc-editor.org/rfc/rfc1889#appendix-A.8";>RFC 
1889 Appendix A.8</a>
+     */
+    @CustomValidator(validatorClass = 
ConfigValidation.EwmaSmoothingFactorValidator.class)
+    public static final String TOPOLOGY_STATS_EWMA_SMOOTHING_FACTOR = 
"topology.stats.ewma.smoothing_factor";

Review Comment:
   Underscore breaks Storm's `dot.case` config convention. Every neighbour key 
uses dots (`topology.stats.sample.rate`, 
`topology.builtin.metrics.bucket.size.secs`, …). Suggest 
`topology.stats.ewma.smoothing.factor`. Worth changing now — once released it's 
a public surface.



##########
storm-client/src/jvm/org/apache/storm/metrics2/EwmaGauge.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.metrics2;
+
+import static org.apache.storm.utils.ConfigUtils.RFC1889_ALPHA;
+
+import com.codahale.metrics.Gauge;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Lock-free jitter estimator following RFC 1889 Section 6.3.1.

Review Comment:
   RFC 1889 §6.3.1 is the *Receiver Reports* section, not the jitter algorithm. 
The algorithm is in §A.8. Suggest: `Lock-free jitter estimator following RFC 
1889 §A.8 / RFC 3550 §A.8.`



##########
storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java:
##########
@@ -27,19 +29,26 @@ public class TaskMetrics {
     private static final String METRIC_NAME_TRANSFERRED = "__transfer-count";
     private static final String METRIC_NAME_EXECUTED = "__execute-count";
     private static final String METRIC_NAME_PROCESS_LATENCY = 
"__process-latency";
+    private static final String METRIC_NAME_PROCESS_RFC_1889a_JITTER = 
"__process-rfc1889a-jitter";
     private static final String METRIC_NAME_COMPLETE_LATENCY = 
"__complete-latency";
+    private static final String METRIC_NAME_COMPLETE_RFC_1889a_JITTER = 
"__complete-rfc1889a-jitter";
     private static final String METRIC_NAME_EXECUTE_LATENCY = 
"__execute-latency";
+    private static final String METRIC_NAME_EXECUTE_RFC_1889a_JITTER = 
"__execute-rfc1889a-jitter";

Review Comment:
   Two issues with these metric names:
   
   1. **`rfc1889a` suffix isn't a real RFC label** — the algorithm is in 
*Appendix A.8* of RFC 1889. The `a` reads like a version suffix. Suggest 
`__complete-rfc1889-jitter` etc., or simply `__complete-jitter` (the RFC 
reference belongs in `Metrics.md`, not in every metric series name).
   2. **Inconsistent identifier casing** — the constants spell it `RFC_1889a` 
(camel-case mid-word with lowercase `a`). Convention in this file is 
all-uppercase: `RFC_1889_JITTER` or `RFC1889_JITTER`.
   
   These metric names become public API the moment the PR ships — much easier 
to fix now than after dashboards are wired up.



##########
storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java:
##########
@@ -27,19 +29,26 @@ public class TaskMetrics {
     private static final String METRIC_NAME_TRANSFERRED = "__transfer-count";
     private static final String METRIC_NAME_EXECUTED = "__execute-count";
     private static final String METRIC_NAME_PROCESS_LATENCY = 
"__process-latency";
+    private static final String METRIC_NAME_PROCESS_RFC_1889a_JITTER = 
"__process-rfc1889a-jitter";
     private static final String METRIC_NAME_COMPLETE_LATENCY = 
"__complete-latency";
+    private static final String METRIC_NAME_COMPLETE_RFC_1889a_JITTER = 
"__complete-rfc1889a-jitter";
     private static final String METRIC_NAME_EXECUTE_LATENCY = 
"__execute-latency";
+    private static final String METRIC_NAME_EXECUTE_RFC_1889a_JITTER = 
"__execute-rfc1889a-jitter";
     private static final String METRIC_NAME_CAPACITY = "__capacity";
 
     private final ConcurrentMap<String, RateCounter> rateCounters = new 
ConcurrentHashMap<>();
-    private final ConcurrentMap<String, RollingAverageGauge> gauges = new 
ConcurrentHashMap<>();
+    private final ConcurrentMap<String, Gauge> gauges = new 
ConcurrentHashMap<>();

Review Comment:
   Going to a raw `ConcurrentMap<String, Gauge>` loses type info that the 
previous `ConcurrentMap<String, RollingAverageGauge>` had. Cleaner would be 
`ConcurrentMap<String, Gauge<?>>` — that keeps the wildcard typing, lets 
`getOrCreateGauge` work unchanged, and the only raw cast moves to 
`registerGauge` where you already have `@SuppressWarnings({"unchecked", 
"rawtypes"})`.



##########
storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java:
##########
@@ -135,18 +166,54 @@ private RateCounter getRateCounter(String metricName, 
String streamId) {
     }
 
     private RollingAverageGauge getRollingAverageGauge(String metricName, 
String streamId) {
-        RollingAverageGauge gauge = this.gauges.get(metricName);
-        if (gauge == null) {
+        return getOrCreateGauge(metricName, streamId, 
RollingAverageGauge.class, this.rollingAverageGaugeFactory);
+    }
+
+    private EwmaGauge getExponentialWeightedMobileAverageGauge(String 
metricName, String streamId) {

Review Comment:
   Typo: "Mobile" → "Moving". EWMA = Exponentially Weighted **Moving** Average. 
(The field name `ewmaSmoothingFactor` is fine; just the helper method.)



##########
storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java:
##########
@@ -849,6 +849,23 @@ public void validateField(String name, Object o) {
         }
     }
 
+    public static class EwmaSmoothingFactorValidator extends Validator {
+        @Override
+        public void validateField(String name, Object o) {
+            if (o == null) {
+                return;
+            }
+            if (o instanceof Number) {
+                double alpha = ((Number) o).doubleValue();
+                if (alpha > 0.0 && alpha < 1.0) {
+                    return;
+                }
+            }
+            throw new IllegalArgumentException(
+                    "Field " + name + " must be a number in the open interval 
(0, 1), got: " + o);

Review Comment:
   This validator only accepts `Number`, but `ConfigUtils.ewmaSmoothingFactor` 
calls `ObjectReader.getDouble(value)` which also parses strings like `"0.5"`. 
Other numeric configs in Storm (e.g. `topology.stats.sample.rate`) accept 
stringified numbers from YAML, so the asymmetry is surprising — a value would 
be rejected at validation time but accepted at runtime if validation is 
bypassed (programmatic conf, tests, etc.).
   
   Suggest mirroring what the runtime does: try `ObjectReader.getDouble(o)` 
inside a try/catch, then range-check. That way the validator and the runtime 
parser agree.



##########
storm-client/src/jvm/org/apache/storm/Config.java:
##########
@@ -596,6 +596,20 @@ public class Config extends HashMap<String, Object> {
      */
     @IsPositiveNumber
     public static final String TOPOLOGY_STATS_SAMPLE_RATE = 
"topology.stats.sample.rate";
+    /**
+     * Enabling jitter streaming calculation (RFC 1889a).

Review Comment:
   Minor: "RFC 1889a" isn't a real RFC label — the algorithm lives in 
**Appendix A.8** of RFC 1889 (and was carried unchanged into RFC 3550 §A.8). 
Suggest "RFC 1889 §A.8" or "RFC 3550 §A.8" everywhere it appears in this PR 
(Javadoc, metric names, docs). RFC 1889 has been obsolete since 2003, so a 
forward link to RFC 3550 is friendlier to readers.



##########
storm-client/src/jvm/org/apache/storm/metrics2/EwmaGauge.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.metrics2;
+
+import static org.apache.storm.utils.ConfigUtils.RFC1889_ALPHA;
+
+import com.codahale.metrics.Gauge;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Lock-free jitter estimator following RFC 1889 Section 6.3.1.
+ * The jitter accumulator is stored as raw IEEE 754 bits in an AtomicLong
+ * so that CAS can be used without locks.
+ * Thread safety: addValue is lock-free; getValue is wait-free.
+ */
+public class EwmaGauge implements Gauge<Double> {
+
+    private static final long UNSEEDED = Long.MIN_VALUE;
+    private static final long ZERO_BITS = Double.doubleToLongBits(0.0);
+
+    private final AtomicLong lastTransit = new AtomicLong(UNSEEDED);
+    private final AtomicLong jitterBits = new AtomicLong(ZERO_BITS);
+    private final double alpha;
+
+    EwmaGauge(double alpha) {
+        if (alpha <= 0.0 || alpha >= 1.0 || Double.isNaN(alpha)) {
+            throw new IllegalArgumentException(
+                    "alpha must be in (0, 1), got: " + alpha);
+        }
+        this.alpha = alpha;
+    }
+
+    EwmaGauge() {
+        this(RFC1889_ALPHA);  // 1.0 / 16.0
+    }
+
+    /**
+     * Update the jitter estimate.
+     *
+     * @param transitMs transit time for this tuple: {@code arrival - 
timestamp}
+     *                  Negative values are silently ignored.
+     */
+    public void addValue(long transitMs) {
+        if (transitMs < 0) {
+            return;
+        }
+
+        // Seed on the very first packet: store transit, nothing to diff 
against yet.
+        if (lastTransit.compareAndSet(UNSEEDED, transitMs)) {
+            return;
+        }
+
+        long prev = lastTransit.getAndSet(transitMs);
+        if (prev == UNSEEDED) {
+            // Lost a race during seeding; prev is not a real transit value.
+            return;
+        }
+
+        double d = Math.abs(transitMs - prev);

Review Comment:
   Worth a one-line comment that this is safe from the well-known 
`Math.abs(Long.MIN_VALUE) == Long.MIN_VALUE` pathology *because* `transitMs >= 
0 && prev >= 0` (enforced by the negative-guard at line 55 + the seed path), so 
`transitMs - prev ∈ [-Long.MAX_VALUE, Long.MAX_VALUE]`. Otherwise a future 
reader is going to wonder.



##########
storm-client/src/jvm/org/apache/storm/metrics2/EwmaGauge.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.metrics2;
+
+import static org.apache.storm.utils.ConfigUtils.RFC1889_ALPHA;
+
+import com.codahale.metrics.Gauge;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Lock-free jitter estimator following RFC 1889 Section 6.3.1.
+ * The jitter accumulator is stored as raw IEEE 754 bits in an AtomicLong
+ * so that CAS can be used without locks.
+ * Thread safety: addValue is lock-free; getValue is wait-free.
+ */
+public class EwmaGauge implements Gauge<Double> {
+
+    private static final long UNSEEDED = Long.MIN_VALUE;
+    private static final long ZERO_BITS = Double.doubleToLongBits(0.0);
+
+    private final AtomicLong lastTransit = new AtomicLong(UNSEEDED);
+    private final AtomicLong jitterBits = new AtomicLong(ZERO_BITS);
+    private final double alpha;
+
+    EwmaGauge(double alpha) {
+        if (alpha <= 0.0 || alpha >= 1.0 || Double.isNaN(alpha)) {
+            throw new IllegalArgumentException(
+                    "alpha must be in (0, 1), got: " + alpha);
+        }
+        this.alpha = alpha;
+    }
+
+    EwmaGauge() {
+        this(RFC1889_ALPHA);  // 1.0 / 16.0
+    }
+
+    /**
+     * Update the jitter estimate.
+     *
+     * @param transitMs transit time for this tuple: {@code arrival - 
timestamp}
+     *                  Negative values are silently ignored.
+     */
+    public void addValue(long transitMs) {
+        if (transitMs < 0) {
+            return;
+        }
+
+        // Seed on the very first packet: store transit, nothing to diff 
against yet.
+        if (lastTransit.compareAndSet(UNSEEDED, transitMs)) {
+            return;
+        }
+
+        long prev = lastTransit.getAndSet(transitMs);
+        if (prev == UNSEEDED) {
+            // Lost a race during seeding; prev is not a real transit value.
+            return;
+        }
+
+        double d = Math.abs(transitMs - prev);
+
+        long currentBits;
+        long updatedBits;
+        do {
+            currentBits = jitterBits.get();
+            double currentJitter = Double.longBitsToDouble(currentBits);
+            double updatedJitter = currentJitter + alpha * (d - currentJitter);
+            updatedBits = Double.doubleToLongBits(updatedJitter);
+        } while (!jitterBits.compareAndSet(currentBits, updatedBits));
+    }
+
+    /**
+     * Returns the current jitter estimate in timestamp units.
+     */
+    @Override
+    public Double getValue() {
+        return Double.longBitsToDouble(jitterBits.get());
+    }
+}

Review Comment:
   Missing newline at EOF. Same in `EwmaGaugeTest.java` and 
`TaskMetricsTest.java` — checkstyle will probably flag these.



##########
storm-client/src/jvm/org/apache/storm/metrics2/EwmaGauge.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.metrics2;
+
+import static org.apache.storm.utils.ConfigUtils.RFC1889_ALPHA;
+
+import com.codahale.metrics.Gauge;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Lock-free jitter estimator following RFC 1889 Section 6.3.1.
+ * The jitter accumulator is stored as raw IEEE 754 bits in an AtomicLong
+ * so that CAS can be used without locks.
+ * Thread safety: addValue is lock-free; getValue is wait-free.
+ */
+public class EwmaGauge implements Gauge<Double> {
+
+    private static final long UNSEEDED = Long.MIN_VALUE;
+    private static final long ZERO_BITS = Double.doubleToLongBits(0.0);
+
+    private final AtomicLong lastTransit = new AtomicLong(UNSEEDED);
+    private final AtomicLong jitterBits = new AtomicLong(ZERO_BITS);
+    private final double alpha;
+
+    EwmaGauge(double alpha) {
+        if (alpha <= 0.0 || alpha >= 1.0 || Double.isNaN(alpha)) {
+            throw new IllegalArgumentException(
+                    "alpha must be in (0, 1), got: " + alpha);
+        }
+        this.alpha = alpha;
+    }
+
+    EwmaGauge() {
+        this(RFC1889_ALPHA);  // 1.0 / 16.0
+    }
+
+    /**
+     * Update the jitter estimate.
+     *
+     * @param transitMs transit time for this tuple: {@code arrival - 
timestamp}
+     *                  Negative values are silently ignored.
+     */
+    public void addValue(long transitMs) {
+        if (transitMs < 0) {
+            return;
+        }
+
+        // Seed on the very first packet: store transit, nothing to diff 
against yet.
+        if (lastTransit.compareAndSet(UNSEEDED, transitMs)) {
+            return;
+        }
+
+        long prev = lastTransit.getAndSet(transitMs);
+        if (prev == UNSEEDED) {
+            // Lost a race during seeding; prev is not a real transit value.
+            return;
+        }

Review Comment:
   Nothing in this class ever resets `lastTransit` back to `UNSEEDED`, so once 
the seed CAS succeeds, `getAndSet` can't return `UNSEEDED` — this branch is 
unreachable. Either drop it, or keep it and add a comment that it's defensive 
against a future caller adding a reset path. Right now the comment ("Lost a 
race during seeding") is incorrect: the seed CAS is non-revertible, so a thread 
that lost the seed race still observes a real `transitMs` here.



##########
storm-client/src/jvm/org/apache/storm/metrics2/EwmaGauge.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
+ * <p>

Review Comment:
   `<p>` tags in the license header are inconsistent with the rest of 
`storm-client` (the standard ASF header uses blank lines between paragraphs — 
see e.g. `TaskMetrics.java` in this same PR). Same applies to 
`EwmaGaugeTest.java` (`<p/>`) and `TaskMetricsTest.java` (`<p/>`). Looks like 
an IDE auto-formatter artifact.



##########
storm-client/test/jvm/org/apache/storm/metrics2/TaskMetricsTest.java:
##########
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Gauge;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.utils.ConfigUtils;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockedStatic;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.*;
+import static org.mockito.Mockito.*;
+
+@ExtendWith(MockitoExtension.class)
+class TaskMetricsTest {
+
+    private static final String TOPOLOGY_ID   = "test-topology-1";
+    private static final String COMPONENT_ID  = "test-bolt";
+    private static final Integer TASK_ID      = 42;
+    private static final Integer WORKER_PORT  = 6700;
+    private static final String STREAM_ID     = "default";
+    private static final String SOURCE_COMP   = "source-spout";
+    private static final int    SAMPLING_RATE = 1;
+    private static final double EWMA_FACTOR   = 0.3;
+
+    @Mock private WorkerTopologyContext context;
+    @Mock private StormMetricRegistry   metricRegistry;
+    @Mock private RateCounter           rateCounter;
+    @Mock private RollingAverageGauge   rollingAverageGauge;
+    @Mock private EwmaGauge ewmaGauge;

Review Comment:
   These two `@Mock` fields are declared but never referenced anywhere in the 
test class — Mockito still spends init cost on them. Drop both.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to