This is an automated email from the ASF dual-hosted git repository.

shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new d0b91c5  SAMZA-2291: Custom IME Put in InMemoryManager runs into 
Offset mismatch even if offsets are correctly set (#1127)
d0b91c5 is described below

commit d0b91c53780e9766e233a9b4a605f513b14e65e2
Author: Sanil Jain <[email protected]>
AuthorDate: Tue Aug 6 11:12:25 2019 -0700

    SAMZA-2291: Custom IME Put in InMemoryManager runs into Offset mismatch 
even if offsets are correctly set (#1127)
    
    * SAMZA-2291: Custom IME Put in TestRunner runs into Offset mismatch even 
on correct offsets
    
    * Addressing comments
---
 .../main/java/org/apache/samza/system/inmemory/InMemoryManager.java  | 5 ++++-
 .../org/apache/samza/test/framework/StreamTaskIntegrationTest.java   | 2 +-
 2 files changed, 5 insertions(+), 2 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
 
b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
index 2c2e9ae..2a32583 100644
--- 
a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.system.inmemory;
 
+import com.google.common.base.Preconditions;
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -86,9 +87,11 @@ class InMemoryManager {
    * @param envelope incoming message envelope
    */
   void put(SystemStreamPartition ssp, IncomingMessageEnvelope envelope) {
+    Preconditions.checkNotNull(envelope);
+    Preconditions.checkNotNull(envelope.getOffset());
     List<IncomingMessageEnvelope> messages = bufferedMessages.get(ssp);
     String offset = String.valueOf(messages.size());
-    if (envelope.getOffset().equals(offset)) {
+    if (!envelope.getOffset().equals(offset)) {
       throw new SamzaException(
           String.format("Offset mismatch for ssp %s, expected %s found %s, 
please set the correct offset", ssp, offset, envelope.getOffset()));
     }
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
 
b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
index 2a58273..50e49d4 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
@@ -214,7 +214,7 @@ public class StreamTaskIntegrationTest {
         inputPartitionIME.get(partitionId).add(KV.of(message.key, ime));
       }
     }
-    syncTaskWithMultiplePartitionMultithreadedHelper(inputPartitionData, 
expectedOutputPartitionData);
+    syncTaskWithMultiplePartitionMultithreadedHelper(inputPartitionIME, 
expectedOutputPartitionData);
   }
 
   void syncTaskWithMultiplePartitionMultithreadedHelper(Map<Integer, List<KV>> 
inputPartitionData,

Reply via email to