Repository: samza Updated Branches: refs/heads/master 7836bf08c -> ea5887178
SAMZA-1971: Fix NPE in partition key computation for InMemorySystemProducer Author: bharathkk <[email protected]> Reviewers: Jagadish<[email protected]> Closes #786 from bharathkk/fix-inmemory-partitionkey-npe Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ea588717 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ea588717 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ea588717 Branch: refs/heads/master Commit: ea5887178a63fc0ccfaf50bb3d5f826145b9c509 Parents: 7836bf0 Author: bharathkk <[email protected]> Authored: Wed Oct 31 12:41:20 2018 -0700 Committer: Jagadish <[email protected]> Committed: Wed Oct 31 12:41:20 2018 -0700 ---------------------------------------------------------------------- .../system/inmemory/InMemorySystemProducer.java | 22 ++++++++++----- .../system/inmemory/TestInMemorySystem.java | 28 ++++++++++++++++++++ 2 files changed, 44 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/ea588717/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemProducer.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemProducer.java b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemProducer.java index cd5e649..872488d 100644 --- a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemProducer.java +++ b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemProducer.java @@ -19,7 +19,7 @@ package org.apache.samza.system.inmemory; -import java.util.Optional; +import com.google.common.base.Preconditions; import org.apache.samza.Partition; import org.apache.samza.system.OutgoingMessageEnvelope; import org.apache.samza.system.SystemProducer; @@ -75,11 +75,21 @@ public class InMemorySystemProducer implements SystemProducer { Object key = envelope.getKey(); Object message = envelope.getMessage(); - // use the hashcode from partition key in the outgoing message envelope or default to message hashcode - int hashCode = Optional.ofNullable(envelope.getPartitionKey()) - .map(Object::hashCode) - .orElse(message.hashCode()); - int partition = Math.abs(hashCode) % memoryManager.getPartitionCountForSystemStream(envelope.getSystemStream()); + Object partitionKey; + // We use the partition key from message if available, if not fallback to message key or use message as partition + // key as the final resort. + if (envelope.getPartitionKey() != null) { + partitionKey = envelope.getPartitionKey(); + } else if (key != null) { + partitionKey = key; + } else { + partitionKey = message; + } + + Preconditions.checkNotNull(partitionKey, "Failed to compute partition key for the message: " + envelope); + + int partition = + Math.abs(partitionKey.hashCode()) % memoryManager.getPartitionCountForSystemStream(envelope.getSystemStream()); SystemStreamPartition ssp = new SystemStreamPartition(envelope.getSystemStream(), new Partition(partition)); memoryManager.put(ssp, key, message); http://git-wip-us.apache.org/repos/asf/samza/blob/ea588717/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystem.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystem.java b/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystem.java index 7d5dfd0..0a2e221 100644 --- a/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystem.java +++ b/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystem.java @@ -142,6 +142,34 @@ public class TestInMemorySystem { assertTrue(results.get(0).isEndOfStream()); } + @Test + public void testNullMessageWithValidMessageKey() { + final String messageKey = "validKey"; + SystemProducer systemProducer = systemFactory.getProducer(SYSTEM_NAME, config, mockRegistry); + systemProducer.send(SOURCE, new OutgoingMessageEnvelope(SYSTEM_STREAM, messageKey, null)); + + SystemConsumer consumer = systemFactory.getConsumer(SYSTEM_NAME, config, mockRegistry); + + Set<SystemStreamPartition> sspsToPoll = IntStream.range(0, PARTITION_COUNT) + .mapToObj(partition -> new SystemStreamPartition(SYSTEM_STREAM, new Partition(partition))) + .collect(Collectors.toSet()); + + // register the consumer for ssps + for (SystemStreamPartition ssp : sspsToPoll) { + consumer.register(ssp, "0"); + } + + List<IncomingMessageEnvelope> results = consumeRawMessages(consumer, sspsToPoll); + assertEquals(1, results.size()); + assertEquals(results.get(0).getKey(), messageKey); + assertNull(results.get(0).getMessage()); + } + + @Test(expected = NullPointerException.class) + public void testNullMessageWithNullKey() { + SystemProducer systemProducer = systemFactory.getProducer(SYSTEM_NAME, config, mockRegistry); + systemProducer.send(SOURCE, new OutgoingMessageEnvelope(SYSTEM_STREAM, null)); + } private <T> List<T> consumeMessages(Set<SystemStreamPartition> sspsToPoll) { SystemConsumer systemConsumer = systemFactory.getConsumer(SYSTEM_NAME, config, mockRegistry);
