merlimat closed pull request #1523: Use signSafeMod in 
RoundRobinPartitionMessageRouter
URL: https://github.com/apache/incubator-pulsar/pull/1523
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index 26f74cf24d..96a82543d5 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -168,7 +168,7 @@ public MessageId send(Message<T> message) throws 
PulsarClientException {
 
         int partition = routerPolicy.choosePartition(message, topicMetadata);
         checkArgument(partition >= 0 && partition < 
topicMetadata.numPartitions(),
-                "Illegal partition index chosen by the message routing 
policy");
+                "Illegal partition index chosen by the message routing policy: 
" + partition);
         return producers.get(partition).sendAsync(message);
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java
index 87cfd1a436..ee8b9a0428 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pulsar.client.impl;
 
-import com.google.common.annotations.VisibleForTesting;
+import static org.apache.pulsar.client.util.MathUtils.signSafeMod;
+
+import java.time.Clock;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import org.apache.pulsar.client.api.HashingScheme;
@@ -46,35 +48,42 @@
     private final boolean isBatchingEnabled;
     private final long maxBatchingDelayMs;
 
-    @VisibleForTesting
+    private final Clock clock;
+
+    private static final Clock SYSTEM_CLOCK = Clock.systemUTC();
+
     public RoundRobinPartitionMessageRouterImpl(HashingScheme hashingScheme,
-                                                int startPtnIdx) {
-        this(hashingScheme, startPtnIdx, false, 0);
+                                                int startPtnIdx,
+                                                boolean isBatchingEnabled,
+                                                long maxBatchingDelayMs) {
+        this(hashingScheme, startPtnIdx, isBatchingEnabled, 
maxBatchingDelayMs, SYSTEM_CLOCK);
     }
 
     public RoundRobinPartitionMessageRouterImpl(HashingScheme hashingScheme,
                                                 int startPtnIdx,
                                                 boolean isBatchingEnabled,
-                                                long maxBatchingDelayMs) {
+                                                long maxBatchingDelayMs,
+                                                Clock clock) {
         super(hashingScheme);
         PARTITION_INDEX_UPDATER.set(this, startPtnIdx);
         this.startPtnIdx = startPtnIdx;
         this.isBatchingEnabled = isBatchingEnabled;
         this.maxBatchingDelayMs = Math.max(1, maxBatchingDelayMs);
+        this.clock = clock;
     }
 
     @Override
     public int choosePartition(Message<?> msg, TopicMetadata topicMetadata) {
         // If the message has a key, it supersedes the round robin routing 
policy
         if (msg.hasKey()) {
-            return hash.makeHash(msg.getKey()) % topicMetadata.numPartitions();
+            return signSafeMod(hash.makeHash(msg.getKey()), 
topicMetadata.numPartitions());
         }
 
         if (isBatchingEnabled) { // if batching is enabled, choose partition 
on `maxBatchingDelayMs` boundary.
-            long currentMs = System.currentTimeMillis();
-            return (((int) (currentMs / maxBatchingDelayMs)) + startPtnIdx) % 
topicMetadata.numPartitions();
+            long currentMs = clock.millis();
+            return signSafeMod(currentMs / maxBatchingDelayMs + startPtnIdx, 
topicMetadata.numPartitions());
         } else {
-            return ((PARTITION_INDEX_UPDATER.getAndIncrement(this) & 
Integer.MAX_VALUE) % topicMetadata.numPartitions());
+            return signSafeMod(PARTITION_INDEX_UPDATER.getAndIncrement(this), 
topicMetadata.numPartitions());
         }
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java
index 9cef8b55ae..aacbe4cda4 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static org.apache.pulsar.client.util.MathUtils.signSafeMod;
+
 import org.apache.pulsar.client.api.HashingScheme;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.TopicMetadata;
@@ -37,7 +39,7 @@ public SinglePartitionMessageRouterImpl(int partitionIndex, 
HashingScheme hashin
     public int choosePartition(Message<?> msg, TopicMetadata metadata) {
         // If the message has a key, it supersedes the single partition 
routing policy
         if (msg.hasKey()) {
-            return hash.makeHash(msg.getKey()) % metadata.numPartitions();
+            return signSafeMod(hash.makeHash(msg.getKey()), 
metadata.numPartitions());
         }
 
         return partitionIndex;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/MathUtils.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/MathUtils.java
new file mode 100644
index 0000000000..70f0569061
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/MathUtils.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.util;
+
+import lombok.experimental.UtilityClass;
+
+@UtilityClass
+public class MathUtils {
+    /**
+     * Compute sign safe mod
+     *
+     * @param dividend
+     * @param divisor
+     * @return
+     */
+    public static int signSafeMod(long dividend, int divisor) {
+        int mod = (int) (dividend % divisor);
+
+        if (mod < 0) {
+            mod += divisor;
+        }
+
+        return mod;
+    }
+}
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImplTest.java
index b6ce4a64b9..3cdebd771d 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImplTest.java
@@ -21,37 +21,94 @@
 import static org.mockito.Mockito.mock;
 import static org.powermock.api.mockito.PowerMockito.when;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneId;
 
 import org.apache.pulsar.client.api.HashingScheme;
 import org.apache.pulsar.client.api.Message;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.testng.IObjectFactory;
-import org.testng.annotations.ObjectFactory;
 import org.testng.annotations.Test;
 
 /**
  * Unit test of {@link RoundRobinPartitionMessageRouterImpl}.
  */
-@PrepareForTest({ RoundRobinPartitionMessageRouterImpl.class })
 public class RoundRobinPartitionMessageRouterImplTest {
 
-    @ObjectFactory
-    public IObjectFactory getObjectFactory() {
-        return new org.powermock.modules.testng.PowerMockObjectFactory();
-    }
-
     @Test
     public void testChoosePartitionWithoutKey() {
         Message<?> msg = mock(Message.class);
         when(msg.getKey()).thenReturn(null);
 
-        RoundRobinPartitionMessageRouterImpl router = new 
RoundRobinPartitionMessageRouterImpl(HashingScheme.JavaStringHash, 0);
+        RoundRobinPartitionMessageRouterImpl router = new 
RoundRobinPartitionMessageRouterImpl(
+                HashingScheme.JavaStringHash, 0, false, 0);
         for (int i = 0; i < 10; i++) {
             assertEquals(i % 5, router.choosePartition(msg, new 
TopicMetadataImpl(5)));
         }
     }
 
+    @Test
+    public void testChoosePartitionWithoutKeyWithBatching() {
+        Message<?> msg = mock(Message.class);
+        when(msg.getKey()).thenReturn(null);
+
+        // Fake clock, simulate 1 millisecond passes for each invocation
+        Clock clock = new Clock() {
+            private long current = 0;
+
+            @Override
+            public Clock withZone(ZoneId zone) {
+                return null;
+            }
+
+            @Override
+            public long millis() {
+                return current++;
+            }
+
+            @Override
+            public Instant instant() {
+                return Instant.ofEpochMilli(millis());
+            }
+
+            @Override
+            public ZoneId getZone() {
+                return ZoneId.systemDefault();
+            }
+        };
+
+        RoundRobinPartitionMessageRouterImpl router = new 
RoundRobinPartitionMessageRouterImpl(
+                HashingScheme.JavaStringHash, 0, true, 5, clock);
+
+        // Since the batching time is 5millis, first 5 messages will go on 
partition 0 and next five would go on
+        // partition 1
+        for (int i = 0; i < 5; i++) {
+            assertEquals(0, router.choosePartition(msg, new 
TopicMetadataImpl(5)));
+        }
+
+        for (int i = 5; i < 10; i++) {
+            assertEquals(1, router.choosePartition(msg, new 
TopicMetadataImpl(5)));
+        }
+    }
+
+    @Test
+    public void testChoosePartitionWithNegativeTime() {
+        Message<?> msg = mock(Message.class);
+        when(msg.getKey()).thenReturn(null);
+
+        // Fake clock, simulate timestamp that resolves into a negative 
Integer value
+        Clock clock = mock(Clock.class);
+        when(clock.millis()).thenReturn((long) Integer.MAX_VALUE);
+
+        RoundRobinPartitionMessageRouterImpl router = new 
RoundRobinPartitionMessageRouterImpl(
+                HashingScheme.JavaStringHash, 3, true, 5, clock);
+
+        int idx = router.choosePartition(msg, new TopicMetadataImpl(5));
+        assertTrue(idx >= 0);
+        assertTrue(idx < 5);;
+    }
+
     @Test
     public void testChoosePartitionWithKey() {
         String key1 = "key1";
@@ -63,7 +120,8 @@ public void testChoosePartitionWithKey() {
         when(msg2.hasKey()).thenReturn(true);
         when(msg2.getKey()).thenReturn(key2);
 
-        RoundRobinPartitionMessageRouterImpl router = new 
RoundRobinPartitionMessageRouterImpl(HashingScheme.JavaStringHash, 0);
+        RoundRobinPartitionMessageRouterImpl router = new 
RoundRobinPartitionMessageRouterImpl(
+                HashingScheme.JavaStringHash, 0, false, 0);
         TopicMetadataImpl metadata = new TopicMetadataImpl(100);
 
         assertEquals(key1.hashCode() % 100, router.choosePartition(msg1, 
metadata));
@@ -75,21 +133,22 @@ public void testBatchingAwareness() throws Exception {
         Message<?> msg = mock(Message.class);
         when(msg.getKey()).thenReturn(null);
 
-        PowerMockito.mockStatic(System.class);
+        Clock clock = mock(Clock.class);
 
-        RoundRobinPartitionMessageRouterImpl router = new 
RoundRobinPartitionMessageRouterImpl(HashingScheme.JavaStringHash, 0, true, 10);
+        RoundRobinPartitionMessageRouterImpl router = new 
RoundRobinPartitionMessageRouterImpl(
+                HashingScheme.JavaStringHash, 0, true, 10, clock);
         TopicMetadataImpl metadata = new TopicMetadataImpl(100);
 
         // time at `12345*` milliseconds
         for (int i = 0; i < 10; i++) {
-            PowerMockito.when(System.currentTimeMillis()).thenReturn(123450L + 
i);
+            when(clock.millis()).thenReturn(123450L + i);
 
             assertEquals(45, router.choosePartition(msg, metadata));
         }
 
         // time at `12346*` milliseconds
         for (int i = 0; i < 10; i++) {
-            PowerMockito.when(System.currentTimeMillis()).thenReturn(123460L + 
i);
+            when(clock.millis()).thenReturn(123460L + i);
 
             assertEquals(46, router.choosePartition(msg, metadata));
         }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionResultRouter.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionResultRouter.java
index 805d86e3a9..adeb46b3c8 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionResultRouter.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionResultRouter.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.functions.instance;
 
 import com.google.common.annotations.VisibleForTesting;
+
+import java.time.Clock;
 import java.util.concurrent.ThreadLocalRandom;
 import org.apache.pulsar.client.api.HashingScheme;
 import org.apache.pulsar.client.api.Message;
@@ -33,16 +35,17 @@
     private static final FunctionResultRouter INSTANCE = new 
FunctionResultRouter();
 
     public FunctionResultRouter() {
-        this(Math.abs(ThreadLocalRandom.current().nextInt()));
+        this(Math.abs(ThreadLocalRandom.current().nextInt()), 
Clock.systemUTC());
     }
 
     @VisibleForTesting
-    public FunctionResultRouter(int startPtnIdx) {
+    public FunctionResultRouter(int startPtnIdx, Clock clock) {
         super(
             HashingScheme.Murmur3_32Hash,
             startPtnIdx,
             true,
-            1);
+            1,
+            clock);
     }
 
     public static FunctionResultRouter of() {
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/FunctionResultRouterTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/FunctionResultRouterTest.java
index 497c7bbc36..f98ac4497e 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/FunctionResultRouterTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/FunctionResultRouterTest.java
@@ -22,29 +22,20 @@
 import static org.powermock.api.mockito.PowerMockito.when;
 import static org.testng.Assert.assertEquals;
 
+import java.time.Clock;
+
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.TopicMetadata;
 import org.apache.pulsar.client.impl.Hash;
 import org.apache.pulsar.client.impl.Murmur3_32Hash;
-import org.apache.pulsar.client.impl.RoundRobinPartitionMessageRouterImpl;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.testng.IObjectFactory;
 import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.ObjectFactory;
 import org.testng.annotations.Test;
 
 /**
  * Unit test of {@link FunctionResultRouter}.
  */
-@PrepareForTest({ RoundRobinPartitionMessageRouterImpl.class })
 public class FunctionResultRouterTest {
 
-    @ObjectFactory
-    public IObjectFactory getObjectFactory() {
-        return new org.powermock.modules.testng.PowerMockObjectFactory();
-    }
-
     private Hash hash;
 
     @BeforeMethod
@@ -61,11 +52,11 @@ public void 
testChoosePartitionWithoutKeyWithoutSequenceId() {
         TopicMetadata topicMetadata = mock(TopicMetadata.class);
         when(topicMetadata.numPartitions()).thenReturn(5);
 
-        PowerMockito.mockStatic(System.class);
+        Clock clock = mock(Clock.class);
 
-        FunctionResultRouter router = new FunctionResultRouter(0);
+        FunctionResultRouter router = new FunctionResultRouter(0, clock);
         for (int i = 0; i < 10; i++) {
-            PowerMockito.when(System.currentTimeMillis()).thenReturn(123450L + 
i);
+            when(clock.millis()).thenReturn(123450L + i);
             assertEquals(i % 5, router.choosePartition(msg, topicMetadata));
         }
     }
@@ -75,7 +66,9 @@ public void testChoosePartitionWithoutKeySequenceId() {
         TopicMetadata topicMetadata = mock(TopicMetadata.class);
         when(topicMetadata.numPartitions()).thenReturn(5);
 
-        FunctionResultRouter router = new FunctionResultRouter(0);
+        Clock clock = mock(Clock.class);
+
+        FunctionResultRouter router = new FunctionResultRouter(0, clock);
         for (int i = 0; i < 10; i++) {
             Message msg = mock(Message.class);
             when(msg.hasKey()).thenReturn(false);
@@ -98,7 +91,9 @@ public void testChoosePartitionWithKeyWithoutSequenceId() {
         when(msg2.getKey()).thenReturn(key2);
         when(msg1.getSequenceId()).thenReturn(-1L);
 
-        FunctionResultRouter router = new FunctionResultRouter(0);
+        Clock clock = mock(Clock.class);
+
+        FunctionResultRouter router = new FunctionResultRouter(0, clock);
         TopicMetadata metadata = mock(TopicMetadata.class);
         when(metadata.numPartitions()).thenReturn(100);
 
@@ -120,7 +115,9 @@ public void testChoosePartitionWithKeySequenceId() {
         when(msg2.getKey()).thenReturn(key2);
         when(msg1.getSequenceId()).thenReturn((long) ((key2.hashCode() % 100) 
+ 1));
 
-        FunctionResultRouter router = new FunctionResultRouter(0);
+        Clock clock = mock(Clock.class);
+
+        FunctionResultRouter router = new FunctionResultRouter(0, clock);
         TopicMetadata metadata = mock(TopicMetadata.class);
         when(metadata.numPartitions()).thenReturn(100);
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to