This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a7760a  SAMZA-2506: Inconsistent end of stream semantics in 
SystemStreamPartitionMetadata (#1345)
0a7760a is described below

commit 0a7760a55be2b6089088016c55217814b5c9675f
Author: bkonold <[email protected]>
AuthorDate: Mon May 18 18:28:19 2020 -0700

    SAMZA-2506: Inconsistent end of stream semantics in 
SystemStreamPartitionMetadata (#1345)
---
 .../samza/system/inmemory/InMemoryManager.java     | 37 ++++++++++++++++++----
 .../samza/system/inmemory/TestInMemoryManager.java |  9 ++++++
 2 files changed, 40 insertions(+), 6 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
 
b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
index f3028f9..9055819 100644
--- 
a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
@@ -181,12 +181,37 @@ class InMemoryManager {
             .stream()
             .collect(Collectors.toMap(entry -> entry.getKey().getPartition(), 
entry -> {
                 List<IncomingMessageEnvelope> messages = entry.getValue();
-                String oldestOffset = messages.isEmpty() ? null : "0";
-                String newestOffset = messages.isEmpty() ? null : 
String.valueOf(messages.size() - 1);
-                String upcomingOffset = String.valueOf(messages.size());
-
-                return new 
SystemStreamMetadata.SystemStreamPartitionMetadata(oldestOffset, newestOffset, 
upcomingOffset);
-
+                Integer oldestOffset;
+                Integer newestOffset;
+                int upcomingOffset;
+
+                if (messages.isEmpty()) {
+                  oldestOffset = null;
+                  newestOffset = null;
+                  upcomingOffset = 0;
+                } else if (messages.get(messages.size() - 1).isEndOfStream()) {
+                  if (messages.size() > 1) {
+                    // don't count end of stream in offset indices
+                    oldestOffset = 0;
+                    newestOffset = messages.size() - 2;
+                    upcomingOffset = messages.size() - 1;
+                  } else {
+                    // end of stream is the only message, treat the same as 
empty
+                    oldestOffset = null;
+                    newestOffset = null;
+                    upcomingOffset = 0;
+                  }
+                } else {
+                  // offsets correspond strictly to numeric indices
+                  oldestOffset = 0;
+                  newestOffset = messages.size() - 1;
+                  upcomingOffset = messages.size();
+                }
+
+                return new SystemStreamMetadata.SystemStreamPartitionMetadata(
+                    oldestOffset == null ? null : oldestOffset.toString(),
+                    newestOffset == null ? null : newestOffset.toString(),
+                    Integer.toString(upcomingOffset));
               }));
 
     return new SystemStreamMetadata(streamName, partitionMetadata);
diff --git 
a/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemoryManager.java
 
b/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemoryManager.java
index 7a32483..890b669 100644
--- 
a/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemoryManager.java
+++ 
b/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemoryManager.java
@@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import org.apache.samza.Partition;
+import org.apache.samza.system.EndOfStreamMessage;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemStreamMetadata;
@@ -77,9 +78,17 @@ public class TestInMemoryManager {
         ImmutableMap.of(new Partition(0), new 
SystemStreamMetadata.SystemStreamPartitionMetadata("0", "1", "2")));
     SystemStreamMetadata systemStreamMetadata1 = new 
SystemStreamMetadata(STREAM1,
         ImmutableMap.of(new Partition(0), new 
SystemStreamMetadata.SystemStreamPartitionMetadata("0", "0", "1")));
+
     // also test a batch call for multiple streams here
     assertEquals(ImmutableMap.of(STREAM0, systemStreamMetadata0, STREAM1, 
systemStreamMetadata1),
         this.inMemoryManager.getSystemStreamMetadata(SYSTEM, 
ImmutableSet.of(STREAM0, STREAM1)));
+
+    // test END_OF_STREAM doesn't alter new or upcoming offset
+    this.inMemoryManager.put(ssp0, "key02", new EndOfStreamMessage());
+    systemStreamMetadata0 = new SystemStreamMetadata(STREAM0,
+        ImmutableMap.of(new Partition(0), new 
SystemStreamMetadata.SystemStreamPartitionMetadata("0", "1", "2")));
+    assertEquals(ImmutableMap.of(STREAM0, systemStreamMetadata0),
+        this.inMemoryManager.getSystemStreamMetadata(SYSTEM, 
ImmutableSet.of(STREAM0)));
   }
 
   @Test

Reply via email to