This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new da1a110  Use signSafeMod in RoundRobinPartitionMessageRouter (#1523)
da1a110 is described below

commit da1a11043c9e4591c6c5c1d8e6434ccc59b913b0
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Mon Apr 9 10:19:00 2018 -0700

    Use signSafeMod in RoundRobinPartitionMessageRouter (#1523)
    
    * Use signSafeMod in RoundRobinPartitionMessageRouter
    
    * Added test with mocked clock
    
    * Fixed tests
    
    * Fixed functions test
---
 .../client/impl/PartitionedProducerImpl.java       |  2 +-
 .../impl/RoundRobinPartitionMessageRouterImpl.java | 27 ++++---
 .../impl/SinglePartitionMessageRouterImpl.java     |  4 +-
 .../org/apache/pulsar/client/util/MathUtils.java   | 41 ++++++++++
 .../RoundRobinPartitionMessageRouterImplTest.java  | 91 ++++++++++++++++++----
 .../functions/instance/FunctionResultRouter.java   |  9 ++-
 .../instance/FunctionResultRouterTest.java         | 31 ++++----
 7 files changed, 158 insertions(+), 47 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index 26f74cf..96a8254 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -168,7 +168,7 @@ public class PartitionedProducerImpl<T> extends 
ProducerBase<T> {
 
         int partition = routerPolicy.choosePartition(message, topicMetadata);
         checkArgument(partition >= 0 && partition < 
topicMetadata.numPartitions(),
-                "Illegal partition index chosen by the message routing 
policy");
+                "Illegal partition index chosen by the message routing policy: 
" + partition);
         return producers.get(partition).sendAsync(message);
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java
index 87cfd1a..ee8b9a0 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pulsar.client.impl;
 
-import com.google.common.annotations.VisibleForTesting;
+import static org.apache.pulsar.client.util.MathUtils.signSafeMod;
+
+import java.time.Clock;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import org.apache.pulsar.client.api.HashingScheme;
@@ -46,35 +48,42 @@ public class RoundRobinPartitionMessageRouterImpl extends 
MessageRouterBase {
     private final boolean isBatchingEnabled;
     private final long maxBatchingDelayMs;
 
-    @VisibleForTesting
+    private final Clock clock;
+
+    private static final Clock SYSTEM_CLOCK = Clock.systemUTC();
+
     public RoundRobinPartitionMessageRouterImpl(HashingScheme hashingScheme,
-                                                int startPtnIdx) {
-        this(hashingScheme, startPtnIdx, false, 0);
+                                                int startPtnIdx,
+                                                boolean isBatchingEnabled,
+                                                long maxBatchingDelayMs) {
+        this(hashingScheme, startPtnIdx, isBatchingEnabled, 
maxBatchingDelayMs, SYSTEM_CLOCK);
     }
 
     public RoundRobinPartitionMessageRouterImpl(HashingScheme hashingScheme,
                                                 int startPtnIdx,
                                                 boolean isBatchingEnabled,
-                                                long maxBatchingDelayMs) {
+                                                long maxBatchingDelayMs,
+                                                Clock clock) {
         super(hashingScheme);
         PARTITION_INDEX_UPDATER.set(this, startPtnIdx);
         this.startPtnIdx = startPtnIdx;
         this.isBatchingEnabled = isBatchingEnabled;
         this.maxBatchingDelayMs = Math.max(1, maxBatchingDelayMs);
+        this.clock = clock;
     }
 
     @Override
     public int choosePartition(Message<?> msg, TopicMetadata topicMetadata) {
         // If the message has a key, it supersedes the round robin routing 
policy
         if (msg.hasKey()) {
-            return hash.makeHash(msg.getKey()) % topicMetadata.numPartitions();
+            return signSafeMod(hash.makeHash(msg.getKey()), 
topicMetadata.numPartitions());
         }
 
         if (isBatchingEnabled) { // if batching is enabled, choose partition 
on `maxBatchingDelayMs` boundary.
-            long currentMs = System.currentTimeMillis();
-            return (((int) (currentMs / maxBatchingDelayMs)) + startPtnIdx) % 
topicMetadata.numPartitions();
+            long currentMs = clock.millis();
+            return signSafeMod(currentMs / maxBatchingDelayMs + startPtnIdx, 
topicMetadata.numPartitions());
         } else {
-            return ((PARTITION_INDEX_UPDATER.getAndIncrement(this) & 
Integer.MAX_VALUE) % topicMetadata.numPartitions());
+            return signSafeMod(PARTITION_INDEX_UPDATER.getAndIncrement(this), 
topicMetadata.numPartitions());
         }
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java
index 9cef8b5..aacbe4c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static org.apache.pulsar.client.util.MathUtils.signSafeMod;
+
 import org.apache.pulsar.client.api.HashingScheme;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.TopicMetadata;
@@ -37,7 +39,7 @@ public class SinglePartitionMessageRouterImpl extends 
MessageRouterBase {
     public int choosePartition(Message<?> msg, TopicMetadata metadata) {
         // If the message has a key, it supersedes the single partition 
routing policy
         if (msg.hasKey()) {
-            return hash.makeHash(msg.getKey()) % metadata.numPartitions();
+            return signSafeMod(hash.makeHash(msg.getKey()), 
metadata.numPartitions());
         }
 
         return partitionIndex;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/MathUtils.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/MathUtils.java
new file mode 100644
index 0000000..70f0569
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/MathUtils.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.util;
+
+import lombok.experimental.UtilityClass;
+
+@UtilityClass
+public class MathUtils {
+    /**
+     * Compute sign safe mod
+     *
+     * @param dividend
+     * @param divisor
+     * @return
+     */
+    public static int signSafeMod(long dividend, int divisor) {
+        int mod = (int) (dividend % divisor);
+
+        if (mod < 0) {
+            mod += divisor;
+        }
+
+        return mod;
+    }
+}
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImplTest.java
index b6ce4a6..3cdebd7 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImplTest.java
@@ -21,38 +21,95 @@ package org.apache.pulsar.client.impl;
 import static org.mockito.Mockito.mock;
 import static org.powermock.api.mockito.PowerMockito.when;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneId;
 
 import org.apache.pulsar.client.api.HashingScheme;
 import org.apache.pulsar.client.api.Message;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.testng.IObjectFactory;
-import org.testng.annotations.ObjectFactory;
 import org.testng.annotations.Test;
 
 /**
  * Unit test of {@link RoundRobinPartitionMessageRouterImpl}.
  */
-@PrepareForTest({ RoundRobinPartitionMessageRouterImpl.class })
 public class RoundRobinPartitionMessageRouterImplTest {
 
-    @ObjectFactory
-    public IObjectFactory getObjectFactory() {
-        return new org.powermock.modules.testng.PowerMockObjectFactory();
-    }
-
     @Test
     public void testChoosePartitionWithoutKey() {
         Message<?> msg = mock(Message.class);
         when(msg.getKey()).thenReturn(null);
 
-        RoundRobinPartitionMessageRouterImpl router = new 
RoundRobinPartitionMessageRouterImpl(HashingScheme.JavaStringHash, 0);
+        RoundRobinPartitionMessageRouterImpl router = new 
RoundRobinPartitionMessageRouterImpl(
+                HashingScheme.JavaStringHash, 0, false, 0);
         for (int i = 0; i < 10; i++) {
             assertEquals(i % 5, router.choosePartition(msg, new 
TopicMetadataImpl(5)));
         }
     }
 
     @Test
+    public void testChoosePartitionWithoutKeyWithBatching() {
+        Message<?> msg = mock(Message.class);
+        when(msg.getKey()).thenReturn(null);
+
+        // Fake clock, simulate 1 millisecond passes for each invocation
+        Clock clock = new Clock() {
+            private long current = 0;
+
+            @Override
+            public Clock withZone(ZoneId zone) {
+                return null;
+            }
+
+            @Override
+            public long millis() {
+                return current++;
+            }
+
+            @Override
+            public Instant instant() {
+                return Instant.ofEpochMilli(millis());
+            }
+
+            @Override
+            public ZoneId getZone() {
+                return ZoneId.systemDefault();
+            }
+        };
+
+        RoundRobinPartitionMessageRouterImpl router = new 
RoundRobinPartitionMessageRouterImpl(
+                HashingScheme.JavaStringHash, 0, true, 5, clock);
+
+        // Since the batching time is 5millis, first 5 messages will go on 
partition 0 and next five would go on
+        // partition 1
+        for (int i = 0; i < 5; i++) {
+            assertEquals(0, router.choosePartition(msg, new 
TopicMetadataImpl(5)));
+        }
+
+        for (int i = 5; i < 10; i++) {
+            assertEquals(1, router.choosePartition(msg, new 
TopicMetadataImpl(5)));
+        }
+    }
+
+    @Test
+    public void testChoosePartitionWithNegativeTime() {
+        Message<?> msg = mock(Message.class);
+        when(msg.getKey()).thenReturn(null);
+
+        // Fake clock, simulate timestamp that resolves into a negative 
Integer value
+        Clock clock = mock(Clock.class);
+        when(clock.millis()).thenReturn((long) Integer.MAX_VALUE);
+
+        RoundRobinPartitionMessageRouterImpl router = new 
RoundRobinPartitionMessageRouterImpl(
+                HashingScheme.JavaStringHash, 3, true, 5, clock);
+
+        int idx = router.choosePartition(msg, new TopicMetadataImpl(5));
+        assertTrue(idx >= 0);
+        assertTrue(idx < 5);;
+    }
+
+    @Test
     public void testChoosePartitionWithKey() {
         String key1 = "key1";
         String key2 = "key2";
@@ -63,7 +120,8 @@ public class RoundRobinPartitionMessageRouterImplTest {
         when(msg2.hasKey()).thenReturn(true);
         when(msg2.getKey()).thenReturn(key2);
 
-        RoundRobinPartitionMessageRouterImpl router = new 
RoundRobinPartitionMessageRouterImpl(HashingScheme.JavaStringHash, 0);
+        RoundRobinPartitionMessageRouterImpl router = new 
RoundRobinPartitionMessageRouterImpl(
+                HashingScheme.JavaStringHash, 0, false, 0);
         TopicMetadataImpl metadata = new TopicMetadataImpl(100);
 
         assertEquals(key1.hashCode() % 100, router.choosePartition(msg1, 
metadata));
@@ -75,21 +133,22 @@ public class RoundRobinPartitionMessageRouterImplTest {
         Message<?> msg = mock(Message.class);
         when(msg.getKey()).thenReturn(null);
 
-        PowerMockito.mockStatic(System.class);
+        Clock clock = mock(Clock.class);
 
-        RoundRobinPartitionMessageRouterImpl router = new 
RoundRobinPartitionMessageRouterImpl(HashingScheme.JavaStringHash, 0, true, 10);
+        RoundRobinPartitionMessageRouterImpl router = new 
RoundRobinPartitionMessageRouterImpl(
+                HashingScheme.JavaStringHash, 0, true, 10, clock);
         TopicMetadataImpl metadata = new TopicMetadataImpl(100);
 
         // time at `12345*` milliseconds
         for (int i = 0; i < 10; i++) {
-            PowerMockito.when(System.currentTimeMillis()).thenReturn(123450L + 
i);
+            when(clock.millis()).thenReturn(123450L + i);
 
             assertEquals(45, router.choosePartition(msg, metadata));
         }
 
         // time at `12346*` milliseconds
         for (int i = 0; i < 10; i++) {
-            PowerMockito.when(System.currentTimeMillis()).thenReturn(123460L + 
i);
+            when(clock.millis()).thenReturn(123460L + i);
 
             assertEquals(46, router.choosePartition(msg, metadata));
         }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionResultRouter.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionResultRouter.java
index 805d86e..adeb46b 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionResultRouter.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionResultRouter.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.functions.instance;
 
 import com.google.common.annotations.VisibleForTesting;
+
+import java.time.Clock;
 import java.util.concurrent.ThreadLocalRandom;
 import org.apache.pulsar.client.api.HashingScheme;
 import org.apache.pulsar.client.api.Message;
@@ -33,16 +35,17 @@ public class FunctionResultRouter extends 
RoundRobinPartitionMessageRouterImpl {
     private static final FunctionResultRouter INSTANCE = new 
FunctionResultRouter();
 
     public FunctionResultRouter() {
-        this(Math.abs(ThreadLocalRandom.current().nextInt()));
+        this(Math.abs(ThreadLocalRandom.current().nextInt()), 
Clock.systemUTC());
     }
 
     @VisibleForTesting
-    public FunctionResultRouter(int startPtnIdx) {
+    public FunctionResultRouter(int startPtnIdx, Clock clock) {
         super(
             HashingScheme.Murmur3_32Hash,
             startPtnIdx,
             true,
-            1);
+            1,
+            clock);
     }
 
     public static FunctionResultRouter of() {
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/FunctionResultRouterTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/FunctionResultRouterTest.java
index 497c7bb..f98ac44 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/FunctionResultRouterTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/FunctionResultRouterTest.java
@@ -22,29 +22,20 @@ import static org.mockito.Mockito.mock;
 import static org.powermock.api.mockito.PowerMockito.when;
 import static org.testng.Assert.assertEquals;
 
+import java.time.Clock;
+
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.TopicMetadata;
 import org.apache.pulsar.client.impl.Hash;
 import org.apache.pulsar.client.impl.Murmur3_32Hash;
-import org.apache.pulsar.client.impl.RoundRobinPartitionMessageRouterImpl;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.testng.IObjectFactory;
 import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.ObjectFactory;
 import org.testng.annotations.Test;
 
 /**
  * Unit test of {@link FunctionResultRouter}.
  */
-@PrepareForTest({ RoundRobinPartitionMessageRouterImpl.class })
 public class FunctionResultRouterTest {
 
-    @ObjectFactory
-    public IObjectFactory getObjectFactory() {
-        return new org.powermock.modules.testng.PowerMockObjectFactory();
-    }
-
     private Hash hash;
 
     @BeforeMethod
@@ -61,11 +52,11 @@ public class FunctionResultRouterTest {
         TopicMetadata topicMetadata = mock(TopicMetadata.class);
         when(topicMetadata.numPartitions()).thenReturn(5);
 
-        PowerMockito.mockStatic(System.class);
+        Clock clock = mock(Clock.class);
 
-        FunctionResultRouter router = new FunctionResultRouter(0);
+        FunctionResultRouter router = new FunctionResultRouter(0, clock);
         for (int i = 0; i < 10; i++) {
-            PowerMockito.when(System.currentTimeMillis()).thenReturn(123450L + 
i);
+            when(clock.millis()).thenReturn(123450L + i);
             assertEquals(i % 5, router.choosePartition(msg, topicMetadata));
         }
     }
@@ -75,7 +66,9 @@ public class FunctionResultRouterTest {
         TopicMetadata topicMetadata = mock(TopicMetadata.class);
         when(topicMetadata.numPartitions()).thenReturn(5);
 
-        FunctionResultRouter router = new FunctionResultRouter(0);
+        Clock clock = mock(Clock.class);
+
+        FunctionResultRouter router = new FunctionResultRouter(0, clock);
         for (int i = 0; i < 10; i++) {
             Message msg = mock(Message.class);
             when(msg.hasKey()).thenReturn(false);
@@ -98,7 +91,9 @@ public class FunctionResultRouterTest {
         when(msg2.getKey()).thenReturn(key2);
         when(msg1.getSequenceId()).thenReturn(-1L);
 
-        FunctionResultRouter router = new FunctionResultRouter(0);
+        Clock clock = mock(Clock.class);
+
+        FunctionResultRouter router = new FunctionResultRouter(0, clock);
         TopicMetadata metadata = mock(TopicMetadata.class);
         when(metadata.numPartitions()).thenReturn(100);
 
@@ -120,7 +115,9 @@ public class FunctionResultRouterTest {
         when(msg2.getKey()).thenReturn(key2);
         when(msg1.getSequenceId()).thenReturn((long) ((key2.hashCode() % 100) 
+ 1));
 
-        FunctionResultRouter router = new FunctionResultRouter(0);
+        Clock clock = mock(Clock.class);
+
+        FunctionResultRouter router = new FunctionResultRouter(0, clock);
         TopicMetadata metadata = mock(TopicMetadata.class);
         when(metadata.numPartitions()).thenReturn(100);
 

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to