This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ae9be46ececa153379bdcedb7fda9f9a404e0d9a Author: Lari Hotari <lhot...@users.noreply.github.com> AuthorDate: Tue Sep 23 01:48:44 2025 +0300 [fix][misc] Fix compareTo contract violation for NamespaceBundleStats, TimeAverageMessageData and ResourceUnitRanking (#24772) (cherry picked from commit 75045380de82d0f14f80dfa49e2946ef03b30d60) --- .../loadbalance/impl/SimpleLoadManagerImpl.java | 2 +- .../extensions/models/TopKBundlesTest.java | 25 +++++++ .../data/loadbalancer/NamespaceBundleStats.java | 50 ++++++------- .../loadbalancer/NamespaceBundleStatsTest.java | 48 ++++++++++++ .../org/apache/pulsar/common/util/CompareUtil.java | 61 +++++++++++++++ .../data/loadbalancer/ResourceUnitRanking.java | 10 ++- .../data/loadbalancer/TimeAverageMessageData.java | 38 +++++++++- .../apache/pulsar/common/util/CompareUtilTest.java | 87 ++++++++++++++++++++++ .../loadbalancer/TimeAverageMessageDataTest.java | 44 +++++++++++ 9 files changed, 333 insertions(+), 32 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java index 8bb27e52298..6ec87b98f3e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java @@ -820,7 +820,7 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification minLoadPercentage = loadPercentage; } else { if ((unboundedRanks ? ranking.compareMessageRateTo(selectedRanking) - : ranking.compareTo(selectedRanking)) < 0) { + : ranking.compareToOtherRanking(selectedRanking)) < 0) { minLoadPercentage = loadPercentage; selectedRU = candidate; selectedRanking = ranking; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java index 472d44df890..603c00d566c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java @@ -271,4 +271,29 @@ public class TopKBundlesTest { } } } + + // Issue https://github.com/apache/pulsar/issues/24754 + @Test + public void testPartitionSortCompareToContractViolationIssue() { + Random rnd = new Random(0); + ArrayList<NamespaceBundleStats> stats = new ArrayList<>(); + for (int i = 0; i < 1000; ++i) { + NamespaceBundleStats s = new NamespaceBundleStats(); + s.msgThroughputIn = 4 * 75000 * rnd.nextDouble(); // Just above threshold (1e5) + s.msgThroughputOut = 75000000 - (4 * (75000 * rnd.nextDouble())); + s.msgRateIn = 4 * 75 * rnd.nextDouble(); + s.msgRateOut = 75000 - (4 * 75 * rnd.nextDouble()); + s.topics = i; + s.consumerCount = i; + s.producerCount = 4 * rnd.nextInt(375); + s.cacheSize = 75000000 - (rnd.nextInt(4 * 75000)); + stats.add(s); + } + List<Map.Entry<String, ? extends Comparable>> bundleEntries = new ArrayList<>(); + + for (NamespaceBundleStats s : stats) { + bundleEntries.add(Map.entry("bundle-" + s.msgThroughputIn, s)); + } + TopKBundles.partitionSort(bundleEntries, 100); + } } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/policies/data/loadbalancer/NamespaceBundleStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/policies/data/loadbalancer/NamespaceBundleStats.java index cd364990842..d3c894524bf 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/policies/data/loadbalancer/NamespaceBundleStats.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/policies/data/loadbalancer/NamespaceBundleStats.java @@ -37,14 +37,14 @@ public class NamespaceBundleStats implements Comparable<NamespaceBundleStats>, S public long topics; public long cacheSize; - // Consider the throughput equal if difference is less than 100 KB/s - private static final double throughputDifferenceThreshold = 1e5; - // Consider the msgRate equal if the difference is less than 100 - private static final double msgRateDifferenceThreshold = 100; - // Consider the total topics/producers/consumers equal if the difference is less than 500 - private static final long topicConnectionDifferenceThreshold = 500; - // Consider the cache size equal if the difference is less than 100 kb - private static final long cacheSizeDifferenceThreshold = 100000; + // When comparing throughput, uses a resolution of 100 KB/s, effectively rounding values before comparison + private static final double throughputComparisonResolution = 1e5; + // When comparing message rate, uses a resolution of 100, effectively rounding values before comparison + private static final double msgRateComparisonResolution = 100; + // When comparing total topics/producers/consumers, uses a resolution/rounding of 500 + private static final long topicConnectionComparisonResolution = 500; + // When comparing cache size, uses a resolution/rounding of 100kB + private static final long cacheSizeComparisonResolution = 100000; public NamespaceBundleStats() { reset(); @@ -89,39 +89,33 @@ public class NamespaceBundleStats implements Comparable<NamespaceBundleStats>, S public int compareByMsgRate(NamespaceBundleStats other) { double thisMsgRate = this.msgRateIn + this.msgRateOut; double otherMsgRate = other.msgRateIn + other.msgRateOut; - if (Math.abs(thisMsgRate - otherMsgRate) > msgRateDifferenceThreshold) { - return Double.compare(thisMsgRate, otherMsgRate); - } - return 0; + return compareDoubleWithResolution(thisMsgRate, otherMsgRate, msgRateComparisonResolution); + } + + private static int compareDoubleWithResolution(double v1, double v2, double resolution) { + return Long.compare(Math.round(v1 / resolution), Math.round(v2 / resolution)); + } + + private static int compareLongWithResolution(long v1, long v2, long resolution) { + return Long.compare(v1 / resolution, v2 / resolution); } public int compareByTopicConnections(NamespaceBundleStats other) { long thisTopicsAndConnections = this.topics + this.consumerCount + this.producerCount; long otherTopicsAndConnections = other.topics + other.consumerCount + other.producerCount; - if (Math.abs(thisTopicsAndConnections - otherTopicsAndConnections) > topicConnectionDifferenceThreshold) { - return Long.compare(thisTopicsAndConnections, otherTopicsAndConnections); - } - return 0; + return compareLongWithResolution(thisTopicsAndConnections, otherTopicsAndConnections, + topicConnectionComparisonResolution); } public int compareByCacheSize(NamespaceBundleStats other) { - if (Math.abs(this.cacheSize - other.cacheSize) > cacheSizeDifferenceThreshold) { - return Long.compare(this.cacheSize, other.cacheSize); - } - return 0; + return compareLongWithResolution(cacheSize, other.cacheSize, cacheSizeComparisonResolution); } public int compareByBandwidthIn(NamespaceBundleStats other) { - if (Math.abs(this.msgThroughputIn - other.msgThroughputIn) > throughputDifferenceThreshold) { - return Double.compare(this.msgThroughputIn, other.msgThroughputIn); - } - return 0; + return compareDoubleWithResolution(msgThroughputIn, other.msgThroughputIn, throughputComparisonResolution); } public int compareByBandwidthOut(NamespaceBundleStats other) { - if (Math.abs(this.msgThroughputOut - other.msgThroughputOut) > throughputDifferenceThreshold) { - return Double.compare(this.msgThroughputOut, other.msgThroughputOut); - } - return 0; + return compareDoubleWithResolution(msgThroughputOut, other.msgThroughputOut, throughputComparisonResolution); } } diff --git a/pulsar-client-admin-api/src/test/java/org/apache/pulsar/policies/data/loadbalancer/NamespaceBundleStatsTest.java b/pulsar-client-admin-api/src/test/java/org/apache/pulsar/policies/data/loadbalancer/NamespaceBundleStatsTest.java new file mode 100644 index 00000000000..7b8d120e629 --- /dev/null +++ b/pulsar-client-admin-api/src/test/java/org/apache/pulsar/policies/data/loadbalancer/NamespaceBundleStatsTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.policies.data.loadbalancer; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import org.testng.annotations.Test; + +public class NamespaceBundleStatsTest { + + @Test + public void testCompareToContract() { + Random rnd = new Random(); + List<NamespaceBundleStats> stats = new ArrayList<>(); + for (int i = 0; i < 1000; ++i) { + NamespaceBundleStats s = new NamespaceBundleStats(); + s.msgThroughputIn = 4 * 75000 * rnd.nextDouble(); + s.msgThroughputOut = 75000000 - (4 * (75000 * rnd.nextDouble())); + s.msgRateIn = 4 * 75 * rnd.nextDouble(); + s.msgRateOut = 75000 - (4 * 75 * rnd.nextDouble()); + s.topics = i; + s.consumerCount = i; + s.producerCount = 4 * rnd.nextInt(375); + s.cacheSize = 75000000 - (rnd.nextInt(4 * 75000)); + stats.add(s); + } + // this would throw "java.lang.IllegalArgumentException: Comparison method violates its general contract!" + // if compareTo() is not implemented correctly. + stats.sort(NamespaceBundleStats::compareTo); + } +} \ No newline at end of file diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/CompareUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/CompareUtil.java new file mode 100644 index 00000000000..b43c42ff1c2 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/CompareUtil.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.util; + +import lombok.experimental.UtilityClass; + +/** + * Utility class for comparing values. + */ +@UtilityClass +public class CompareUtil { + + /** + * Compare two double values with a given resolution. + * @param v1 first value to compare + * @param v2 second value to compare + * @param resolution resolution to compare with + * @return -1 if v1 < v2, 0 if v1 == v2, 1 if v1 > v2 + */ + public static int compareDoubleWithResolution(double v1, double v2, double resolution) { + return Long.compare((long) (v1 / resolution), (long) (v2 / resolution)); + } + + /** + * Compare two long values with a given resolution. + * @param v1 first value to compare + * @param v2 second value to compare + * @param resolution resolution to compare with + * @return -1 if v1 < v2, 0 if v1 == v2, 1 if v1 > v2 + */ + public static int compareLongWithResolution(long v1, long v2, long resolution) { + return Long.compare(v1 / resolution, v2 / resolution); + } + + /** + * Compare two int values with a given resolution. + * @param v1 first value to compare + * @param v2 second value to compare + * @param resolution resolution to compare with + * @return -1 if v1 < v2, 0 if v1 == v2, 1 if v1 > v2 + */ + public static int compareIntegerWithResolution(int v1, int v2, int resolution) { + return Integer.compare(v1 / resolution, v2 / resolution); + } +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/ResourceUnitRanking.java b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/ResourceUnitRanking.java index 3a9d7de09e1..9b83be08205 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/ResourceUnitRanking.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/ResourceUnitRanking.java @@ -26,7 +26,7 @@ import org.apache.pulsar.common.policies.data.ResourceQuota; * The class containing information about system resources, allocated quota, and loaded bundles. */ @EqualsAndHashCode -public class ResourceUnitRanking implements Comparable<ResourceUnitRanking> { +public class ResourceUnitRanking { private static final long KBITS_TO_BYTES = 1024 / 8; private static final double PERCENTAGE_DIFFERENCE_THRESHOLD = 5.0; @@ -129,7 +129,13 @@ public class ResourceUnitRanking implements Comparable<ResourceUnitRanking> { } - public int compareTo(ResourceUnitRanking other) { + /** + * Compares to another ranking. Please note that this cannot be used to sort the rankings since the results + * of this method don't satify the contract of {@link Comparable#compareTo(Object)} + * @param other other ranking to compare to + * @return negative if this is less than other, 0 if they are equal, positive if this is greater than other + */ + public int compareToOtherRanking(ResourceUnitRanking other) { if (Math.abs(this.estimatedLoadPercentage - other.estimatedLoadPercentage) > PERCENTAGE_DIFFERENCE_THRESHOLD) { return Double.compare(this.estimatedLoadPercentage, other.estimatedLoadPercentage); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/TimeAverageMessageData.java b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/TimeAverageMessageData.java index 777a6684ce8..5264c5058fb 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/TimeAverageMessageData.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/TimeAverageMessageData.java @@ -18,10 +18,14 @@ */ package org.apache.pulsar.policies.data.loadbalancer; +import static org.apache.pulsar.common.util.CompareUtil.compareDoubleWithResolution; +import lombok.EqualsAndHashCode; + /** * Data class comprising the average message data over a fixed period of time. */ -public class TimeAverageMessageData { +@EqualsAndHashCode +public class TimeAverageMessageData implements Comparable<TimeAverageMessageData> { // The maximum number of samples this data will consider. private int maxSamples; @@ -41,6 +45,11 @@ public class TimeAverageMessageData { // The average message rate out per second. private double msgRateOut; + // When comparing throughput, uses a resolution of 100 KB/s, effectively rounding values before comparison + private static final double throughputComparisonResolution = 1e5; + // When comparing message rate, uses a resolution of 100, effectively rounding values before comparison + private static final double msgRateComparisonResolution = 100; + // For JSON only. public TimeAverageMessageData() { } @@ -177,4 +186,31 @@ public class TimeAverageMessageData { public double totalMsgThroughput() { return msgThroughputIn + msgThroughputOut; } + + @Override + public int compareTo(TimeAverageMessageData other) { + int result = this.compareByBandwidthIn(other); + + if (result == 0) { + result = this.compareByBandwidthOut(other); + } + if (result == 0) { + result = this.compareByMsgRate(other); + } + return result; + } + + public int compareByMsgRate(TimeAverageMessageData other) { + double thisMsgRate = this.msgRateIn + this.msgRateOut; + double otherMsgRate = other.msgRateIn + other.msgRateOut; + return compareDoubleWithResolution(thisMsgRate, otherMsgRate, msgRateComparisonResolution); + } + + public int compareByBandwidthIn(TimeAverageMessageData other) { + return compareDoubleWithResolution(msgThroughputIn, other.msgThroughputIn, throughputComparisonResolution); + } + + public int compareByBandwidthOut(TimeAverageMessageData other) { + return compareDoubleWithResolution(msgThroughputOut, other.msgThroughputOut, throughputComparisonResolution); + } } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/CompareUtilTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/CompareUtilTest.java new file mode 100644 index 00000000000..1c08ee4b2d0 --- /dev/null +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/CompareUtilTest.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.util; + +import static org.testng.Assert.assertEquals; +import org.testng.annotations.Test; + +public class CompareUtilTest { + @Test + public void testCompareDoubleWithResolution_bucketedComparison() { + // Same truncated bucket when dividing by resolution + assertEquals(CompareUtil.compareDoubleWithResolution(0.01, 0.49, 0.5), 0); + assertEquals(CompareUtil.compareDoubleWithResolution(0.51, 0.99, 0.5), 0); + + // Different truncated buckets + assertEquals(CompareUtil.compareDoubleWithResolution(0.51, 0.49, 0.5), 1); + assertEquals(CompareUtil.compareDoubleWithResolution(0.49, 0.51, 0.5), -1); + assertEquals(CompareUtil.compareDoubleWithResolution(1.01, 0.49, 0.5), 1); + + // Larger numbers + assertEquals(CompareUtil.compareDoubleWithResolution(19.99, 10.01, 10.0), 0); + assertEquals(CompareUtil.compareDoubleWithResolution(19.99, 20.01, 10.0), -1); + assertEquals(CompareUtil.compareDoubleWithResolution(10.00, 9.99, 10.0), 1); + } + + @Test + public void testCompareLongWithResolution_exactComparison() { + // resolution = 1 -> behave like Long.compare + assertEquals(CompareUtil.compareLongWithResolution(1L, 2L, 1L), -1); + assertEquals(CompareUtil.compareLongWithResolution(2L, 1L, 1L), 1); + assertEquals(CompareUtil.compareLongWithResolution(2L, 2L, 1L), 0); + } + + @Test + public void testCompareLongWithResolution_bucketedComparison() { + // Same bucket when divided by resolution + assertEquals(CompareUtil.compareLongWithResolution(8L, 9L, 10L), 0); + assertEquals(CompareUtil.compareLongWithResolution(10L, 19L, 10L), 0); + + // Different buckets + assertEquals(CompareUtil.compareLongWithResolution(9L, 20L, 10L), -1); + assertEquals(CompareUtil.compareLongWithResolution(21L, 10L, 10L), 1); + + // Larger resolution + assertEquals(CompareUtil.compareLongWithResolution(100L, 175L, 100L), 0); + assertEquals(CompareUtil.compareLongWithResolution(199L, 201L, 100L), -1); + } + + @Test + public void testCompareIntegerWithResolution_exactComparison() { + // resolution = 1 -> behave like Integer.compare + assertEquals(CompareUtil.compareIntegerWithResolution(1, 2, 1), -1); + assertEquals(CompareUtil.compareIntegerWithResolution(2, 1, 1), 1); + assertEquals(CompareUtil.compareIntegerWithResolution(2, 2, 1), 0); + } + + @Test + public void testCompareIntegerWithResolution_bucketedComparison() { + // Same bucket + assertEquals(CompareUtil.compareIntegerWithResolution(3, 4, 5), 0); + assertEquals(CompareUtil.compareIntegerWithResolution(5, 9, 5), 0); + + // Different buckets + assertEquals(CompareUtil.compareIntegerWithResolution(4, 10, 5), -1); + assertEquals(CompareUtil.compareIntegerWithResolution(11, 5, 5), 1); + + // Larger resolution + assertEquals(CompareUtil.compareIntegerWithResolution(51, 75, 50), 0); + assertEquals(CompareUtil.compareIntegerWithResolution(49, 101, 50), -1); + } +} \ No newline at end of file diff --git a/pulsar-common/src/test/java/org/apache/pulsar/policies/data/loadbalancer/TimeAverageMessageDataTest.java b/pulsar-common/src/test/java/org/apache/pulsar/policies/data/loadbalancer/TimeAverageMessageDataTest.java new file mode 100644 index 00000000000..ff7f3083014 --- /dev/null +++ b/pulsar-common/src/test/java/org/apache/pulsar/policies/data/loadbalancer/TimeAverageMessageDataTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.policies.data.loadbalancer; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import org.testng.annotations.Test; + +public class TimeAverageMessageDataTest { + @Test + public void testCompareToContract() { + Random rnd = new Random(); + List<TimeAverageMessageData> list = new ArrayList<>(); + for (int i = 0; i < 1000; ++i) { + TimeAverageMessageData data = new TimeAverageMessageData(1); + double msgThroughputIn = 4 * 75000 * rnd.nextDouble(); + double msgThroughputOut = 75000000 - (4 * (75000 * rnd.nextDouble())); + double msgRateIn = 4 * 75 * rnd.nextDouble(); + double msgRateOut = 75000 - (4 * 75 * rnd.nextDouble()); + data.update(msgThroughputIn, msgThroughputOut, msgRateIn, msgRateOut); + list.add(data); + } + // this would throw "java.lang.IllegalArgumentException: Comparison method violates its general contract!" + // if compareTo() is not implemented correctly. + list.sort(TimeAverageMessageData::compareTo); + } +} \ No newline at end of file