This is an automated email from the ASF dual-hosted git repository.
shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new d0b91c5 SAMZA-2291: Custom IME Put in InMemoryManager runs into
Offset mismatch even if offsets are correctly set (#1127)
d0b91c5 is described below
commit d0b91c53780e9766e233a9b4a605f513b14e65e2
Author: Sanil Jain <[email protected]>
AuthorDate: Tue Aug 6 11:12:25 2019 -0700
SAMZA-2291: Custom IME Put in InMemoryManager runs into Offset mismatch
even if offsets are correctly set (#1127)
* SAMZA-2291: Custom IME Put in TestRunner runs into Offset mismatch even
on correct offsets
* Addressing comments
---
.../main/java/org/apache/samza/system/inmemory/InMemoryManager.java | 5 ++++-
.../org/apache/samza/test/framework/StreamTaskIntegrationTest.java | 2 +-
2 files changed, 5 insertions(+), 2 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
index 2c2e9ae..2a32583 100644
---
a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
+++
b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
@@ -19,6 +19,7 @@
package org.apache.samza.system.inmemory;
+import com.google.common.base.Preconditions;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
@@ -86,9 +87,11 @@ class InMemoryManager {
* @param envelope incoming message envelope
*/
void put(SystemStreamPartition ssp, IncomingMessageEnvelope envelope) {
+ Preconditions.checkNotNull(envelope);
+ Preconditions.checkNotNull(envelope.getOffset());
List<IncomingMessageEnvelope> messages = bufferedMessages.get(ssp);
String offset = String.valueOf(messages.size());
- if (envelope.getOffset().equals(offset)) {
+ if (!envelope.getOffset().equals(offset)) {
throw new SamzaException(
String.format("Offset mismatch for ssp %s, expected %s found %s,
please set the correct offset", ssp, offset, envelope.getOffset()));
}
diff --git
a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
index 2a58273..50e49d4 100644
---
a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
+++
b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
@@ -214,7 +214,7 @@ public class StreamTaskIntegrationTest {
inputPartitionIME.get(partitionId).add(KV.of(message.key, ime));
}
}
- syncTaskWithMultiplePartitionMultithreadedHelper(inputPartitionData,
expectedOutputPartitionData);
+ syncTaskWithMultiplePartitionMultithreadedHelper(inputPartitionIME,
expectedOutputPartitionData);
}
void syncTaskWithMultiplePartitionMultithreadedHelper(Map<Integer, List<KV>>
inputPartitionData,