This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 291e854  SAMZA-2502: Byte array keys be partitioned based on array 
contents in… (#1338)
291e854 is described below

commit 291e8540bec120363df07bcd4468d446590e0e0f
Author: Yixing Zhang <[email protected]>
AuthorDate: Thu Apr 2 18:21:39 2020 -0700

    SAMZA-2502: Byte array keys be partitioned based on array contents in… 
(#1338)
---
 .../system/inmemory/InMemorySystemProducer.java    | 11 ++-
 .../inmemory/TestInMemorySystemProducer.java       | 83 ++++++++++++++++++++++
 2 files changed, 93 insertions(+), 1 deletion(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemProducer.java
 
b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemProducer.java
index 246a558..342d2c8 100644
--- 
a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemProducer.java
+++ 
b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemProducer.java
@@ -20,6 +20,7 @@
 package org.apache.samza.system.inmemory;
 
 import com.google.common.base.Preconditions;
+import java.util.Arrays;
 import org.apache.samza.Partition;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.OutgoingMessageEnvelope;
@@ -90,7 +91,7 @@ public class InMemorySystemProducer implements SystemProducer 
{
     Preconditions.checkNotNull(partitionKey, "Failed to compute partition key 
for the message: " + envelope);
 
     int partition =
-        Math.abs(partitionKey.hashCode()) % 
memoryManager.getPartitionCountForSystemStream(envelope.getSystemStream());
+        Math.abs(hashCode(partitionKey)) % 
memoryManager.getPartitionCountForSystemStream(envelope.getSystemStream());
 
     SystemStreamPartition ssp = new 
SystemStreamPartition(envelope.getSystemStream(), new Partition(partition));
     memoryManager.put(ssp, key, message);
@@ -121,4 +122,12 @@ public class InMemorySystemProducer implements 
SystemProducer {
   public void flush(String source) {
     // nothing to do
   }
+
+  /**
+   * Return the hash code of the partitionKey. When partitionKey is a byte 
array, it returns a hash code based on
+   * the contents of the byte array. This guarantees that byte arrays with 
same contents get the same hash code.
+   */
+  private int hashCode(Object partitionKey) {
+    return (partitionKey instanceof byte[]) ? Arrays.hashCode((byte[]) 
partitionKey) : partitionKey.hashCode();
+  }
 }
diff --git 
a/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystemProducer.java
 
b/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystemProducer.java
new file mode 100644
index 0000000..f158695
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystemProducer.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.inmemory;
+
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import static org.mockito.Mockito.*;
+
+
+public class TestInMemorySystemProducer {
+
+  @Mock
+  private InMemoryManager inMemoryManager;
+
+  private InMemorySystemProducer inMemorySystemProducer;
+  private boolean testFinished;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    this.inMemorySystemProducer = new InMemorySystemProducer("systemName", 
this.inMemoryManager);
+    this.testFinished = false;
+  }
+
+  /**
+   * Test keys of type byte[] goes to the same partition if they have the same 
contents.
+   */
+  @Test
+  public void testPartition() {
+    
doReturn(1000).when(inMemoryManager).getPartitionCountForSystemStream(any());
+    doAnswer(new Answer<Void>() {
+      int partitionOfFirstMessage = -1;
+      int partitionOfSecondMessage = -2;
+
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        SystemStreamPartition ssp = invocation.getArgumentAt(0, 
SystemStreamPartition.class);
+        if (partitionOfFirstMessage == -1) {
+          partitionOfFirstMessage = ssp.getPartition().getPartitionId();
+        } else {
+          partitionOfSecondMessage = ssp.getPartition().getPartitionId();
+          Assert.assertEquals(partitionOfFirstMessage, 
partitionOfSecondMessage);
+          testFinished = true;
+        }
+        return null;
+      }
+    }).when(inMemoryManager).put(any(), any(), any());
+
+    byte[] key1 = new byte[]{1, 2, 3};
+    byte[] key2 = new byte[]{1, 2, 3};
+    SystemStream systemStream = new SystemStream("TestSystem", "TestStream");
+    OutgoingMessageEnvelope outgoingMessageEnvelope1 = new 
OutgoingMessageEnvelope(systemStream, key1, null);
+    OutgoingMessageEnvelope outgoingMessageEnvelope2 = new 
OutgoingMessageEnvelope(systemStream, key2, null);
+    inMemorySystemProducer.send("TestSource", outgoingMessageEnvelope1);
+    inMemorySystemProducer.send("TestSource", outgoingMessageEnvelope2);
+    Assert.assertTrue(testFinished);
+  }
+}

Reply via email to