This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 37de270  SAMZA-2297: InMemorySystemAdmin offsets are off-by-one in 
some cases  (#1133)
37de270 is described below

commit 37de270bce27defca691891a93517c8d694057c8
Author: Cameron Lee <[email protected]>
AuthorDate: Mon Aug 12 17:39:54 2019 -0700

    SAMZA-2297: InMemorySystemAdmin offsets are off-by-one in some cases  
(#1133)
---
 .../samza/system/inmemory/InMemoryManager.java     |   7 +-
 .../samza/system/inmemory/InMemorySystemAdmin.java |  12 +-
 .../system/inmemory/InMemorySystemConsumer.java    |  21 +++-
 .../samza/system/inmemory/TestInMemoryManager.java | 133 +++++++++++++++++++++
 .../system/inmemory/TestInMemorySystemAdmin.java   |  50 ++++++++
 .../inmemory/TestInMemorySystemConsumer.java       |  97 +++++++++++++++
 .../apache/samza/test/framework/TestRunner.java    |   4 +-
 7 files changed, 312 insertions(+), 12 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
 
b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
index 2a32583..f53e0b3 100644
--- 
a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
@@ -179,9 +179,10 @@ class InMemoryManager {
             .entrySet()
             .stream()
             .collect(Collectors.toMap(entry -> entry.getKey().getPartition(), 
entry -> {
-                String oldestOffset = "0";
-                String newestOffset = String.valueOf(entry.getValue().size());
-                String upcomingOffset = String.valueOf(entry.getValue().size() 
+ 1);
+                List<IncomingMessageEnvelope> messages = entry.getValue();
+                String oldestOffset = messages.isEmpty() ? null : "0";
+                String newestOffset = messages.isEmpty() ? null : 
String.valueOf(messages.size() - 1);
+                String upcomingOffset = String.valueOf(messages.size());
 
                 return new 
SystemStreamMetadata.SystemStreamPartitionMetadata(oldestOffset, newestOffset, 
upcomingOffset);
 
diff --git 
a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemAdmin.java
 
b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemAdmin.java
index 8e5f732..cb5478c 100644
--- 
a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemAdmin.java
+++ 
b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemAdmin.java
@@ -90,11 +90,15 @@ public class InMemorySystemAdmin implements SystemAdmin {
    */
   @Override
   public Integer offsetComparator(String offset1, String offset2) {
-    if (offset1 == null || offset2 == null) {
-      return null;
+    if (offset1 == null && offset2 == null) {
+      return 0;
+    } else if (offset1 == null) {
+      return -1;
+    } else if (offset2 == null) {
+      return 1;
+    } else {
+      return Integer.compare(Integer.parseInt(offset1), 
Integer.parseInt(offset2));
     }
-
-    return Integer.compare(Integer.parseInt(offset1), 
Integer.parseInt(offset2));
   }
 
   /**
diff --git 
a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemConsumer.java
 
b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemConsumer.java
index dcab001..2c02b79 100644
--- 
a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemConsumer.java
+++ 
b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemConsumer.java
@@ -65,7 +65,10 @@ public class InMemorySystemConsumer implements 
SystemConsumer {
    * should try and read messages from all SystemStreamPartitions that are
    * registered to it. SystemStreamPartitions should only be registered before
    * start is called.
-   *  @param systemStreamPartition
+   *
+   * For this implementation, if the offset is null, then consumption will 
start with the oldest offset.
+   *
+   * @param systemStreamPartition
    *          The SystemStreamPartition object representing the Samza
    *          SystemStreamPartition to receive messages from.
    * @param offset
@@ -74,11 +77,23 @@ public class InMemorySystemConsumer implements 
SystemConsumer {
    *          specified, the first message for the system/stream/partition to 
be
    *          consumed and returned would be a message whose offset is "7".
    *          Note: For broadcast streams, different tasks may checkpoint the 
same ssp with different values. It
+   *          is the system's responsibility to select the lowest one.
    */
   @Override
   public void register(SystemStreamPartition systemStreamPartition, String 
offset) {
-    LOG.info("Registering ssp {} with starting offset {}", 
systemStreamPartition, offset);
-    sspToOffset.put(systemStreamPartition, offset);
+    String offsetToRegister;
+    if (offset == null) {
+      /*
+       * A null offset is the same as the oldest offset of the stream. Can't 
use null directly since ConcurrentHashMap
+       * doesn't allow putting null values.
+       */
+      LOG.info("Registering ssp {} with starting offset null, overriding to 
0", systemStreamPartition);
+      offsetToRegister = "0";
+    } else {
+      LOG.info("Registering ssp {} with starting offset {}", 
systemStreamPartition, offset);
+      offsetToRegister = offset;
+    }
+    sspToOffset.put(systemStreamPartition, offsetToRegister);
   }
 
   /**
diff --git 
a/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemoryManager.java
 
b/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemoryManager.java
new file mode 100644
index 0000000..7a32483
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemoryManager.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.inmemory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.samza.Partition;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class TestInMemoryManager {
+  private static final String SYSTEM = "system";
+  private static final String STREAM0 = "stream0";
+  private static final String STREAM1 = "stream1";
+
+  private InMemoryManager inMemoryManager;
+
+  @Before
+  public void setup() {
+    this.inMemoryManager = new InMemoryManager();
+  }
+
+  @Test
+  public void testGetSystemStreamMetadata() {
+    this.inMemoryManager.initializeStream(new StreamSpec(STREAM0, STREAM0, 
SYSTEM, 1));
+    this.inMemoryManager.initializeStream(new StreamSpec(STREAM1, STREAM1, 
SYSTEM, 1));
+    // add some other stream which we won't request metadata for
+    this.inMemoryManager.initializeStream(new StreamSpec("otherStream", 
"otherStream", SYSTEM, 1));
+
+    // empty stream
+    SystemStreamMetadata systemStreamMetadata0 = new 
SystemStreamMetadata(STREAM0,
+        ImmutableMap.of(new Partition(0), new 
SystemStreamMetadata.SystemStreamPartitionMetadata(null, null, "0")));
+    assertEquals(ImmutableMap.of(STREAM0, systemStreamMetadata0),
+        this.inMemoryManager.getSystemStreamMetadata(SYSTEM, 
ImmutableSet.of(STREAM0)));
+
+    // add a message in
+    SystemStreamPartition ssp0 = new SystemStreamPartition(SYSTEM, STREAM0, 
new Partition(0));
+    this.inMemoryManager.put(ssp0, "key00", "message00");
+    systemStreamMetadata0 = new SystemStreamMetadata(STREAM0,
+        ImmutableMap.of(new Partition(0), new 
SystemStreamMetadata.SystemStreamPartitionMetadata("0", "0", "1")));
+    assertEquals(ImmutableMap.of(STREAM0, systemStreamMetadata0),
+        this.inMemoryManager.getSystemStreamMetadata(SYSTEM, 
ImmutableSet.of(STREAM0)));
+
+    // add a second message to the first stream and add one message to the 
second stream
+    this.inMemoryManager.put(ssp0, "key01", "message01");
+    SystemStreamPartition ssp1 = new SystemStreamPartition(SYSTEM, STREAM1, 
new Partition(0));
+    this.inMemoryManager.put(ssp1, "key10", "message10");
+    systemStreamMetadata0 = new SystemStreamMetadata(STREAM0,
+        ImmutableMap.of(new Partition(0), new 
SystemStreamMetadata.SystemStreamPartitionMetadata("0", "1", "2")));
+    SystemStreamMetadata systemStreamMetadata1 = new 
SystemStreamMetadata(STREAM1,
+        ImmutableMap.of(new Partition(0), new 
SystemStreamMetadata.SystemStreamPartitionMetadata("0", "0", "1")));
+    // also test a batch call for multiple streams here
+    assertEquals(ImmutableMap.of(STREAM0, systemStreamMetadata0, STREAM1, 
systemStreamMetadata1),
+        this.inMemoryManager.getSystemStreamMetadata(SYSTEM, 
ImmutableSet.of(STREAM0, STREAM1)));
+  }
+
+  @Test
+  public void testPoll() {
+    this.inMemoryManager.initializeStream(new StreamSpec(STREAM0, STREAM0, 
SYSTEM, 1));
+    this.inMemoryManager.initializeStream(new StreamSpec(STREAM1, STREAM1, 
SYSTEM, 1));
+    // add some other stream which we won't request metadata for
+    this.inMemoryManager.initializeStream(new StreamSpec("otherStream", 
"otherStream", SYSTEM, 1));
+
+    // empty stream
+    SystemStreamPartition ssp0 = new SystemStreamPartition(SYSTEM, STREAM0, 
new Partition(0));
+    assertEquals(ImmutableMap.of(ssp0, ImmutableList.of()),
+        this.inMemoryManager.poll(Collections.singletonMap(ssp0, "0")));
+
+    // add a message in
+    this.inMemoryManager.put(ssp0, "key00", "message00");
+    Map<SystemStreamPartition, List<IncomingMessageEnvelope>> polledMessages =
+        this.inMemoryManager.poll(Collections.singletonMap(ssp0, "0"));
+    assertEquals(1, polledMessages.get(ssp0).size());
+    assertIncomingMessageEnvelope("key00", "message00", "0", ssp0, 
polledMessages.get(ssp0).get(0));
+
+    // add a second message to the first stream
+    this.inMemoryManager.put(ssp0, "key01", "message01");
+    // verify multiple messages returned
+    polledMessages = this.inMemoryManager.poll(ImmutableMap.of(ssp0, "0"));
+    assertEquals(2, polledMessages.get(ssp0).size());
+    assertIncomingMessageEnvelope("key00", "message00", "0", ssp0, 
polledMessages.get(ssp0).get(0));
+    assertIncomingMessageEnvelope("key01", "message01", "1", ssp0, 
polledMessages.get(ssp0).get(1));
+    // make sure only read messages starting from the offset that is not the 
oldest offset
+    polledMessages = this.inMemoryManager.poll(ImmutableMap.of(ssp0, "1"));
+    assertEquals(1, polledMessages.get(ssp0).size());
+    assertIncomingMessageEnvelope("key01", "message01", "1", ssp0, 
polledMessages.get(ssp0).get(0));
+
+    // add a message to the second stream to test a batch call
+    SystemStreamPartition ssp1 = new SystemStreamPartition(SYSTEM, STREAM1, 
new Partition(0));
+    this.inMemoryManager.put(ssp1, "key10", "message10");
+    polledMessages = this.inMemoryManager.poll(ImmutableMap.of(ssp0, "1", 
ssp1, "0"));
+    assertEquals(1, polledMessages.get(ssp0).size());
+    assertIncomingMessageEnvelope("key01", "message01", "1", ssp0, 
polledMessages.get(ssp0).get(0));
+    assertEquals(1, polledMessages.get(ssp1).size());
+    assertIncomingMessageEnvelope("key10", "message10", "0", ssp1, 
polledMessages.get(ssp1).get(0));
+  }
+
+  private static void assertIncomingMessageEnvelope(String expectedKey, String 
expectedMessage, String expectedOffset,
+      SystemStreamPartition expectedSystemStreamPartition, 
IncomingMessageEnvelope actualIncomingMessageEnvelope) {
+    assertEquals(expectedKey, actualIncomingMessageEnvelope.getKey());
+    assertEquals(expectedMessage, actualIncomingMessageEnvelope.getMessage());
+    assertEquals(expectedOffset, actualIncomingMessageEnvelope.getOffset());
+    assertEquals(expectedSystemStreamPartition, 
actualIncomingMessageEnvelope.getSystemStreamPartition());
+  }
+}
\ No newline at end of file
diff --git 
a/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystemAdmin.java
 
b/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystemAdmin.java
new file mode 100644
index 0000000..da774c4
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystemAdmin.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.inmemory;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class TestInMemorySystemAdmin {
+  @Mock
+  private InMemoryManager inMemoryManager;
+
+  private InMemorySystemAdmin inMemorySystemAdmin;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    this.inMemorySystemAdmin = new InMemorySystemAdmin("system", 
this.inMemoryManager);
+  }
+
+  @Test
+  public void testOffsetComparator() {
+    assertEquals(0, inMemorySystemAdmin.offsetComparator(null, 
null).intValue());
+    assertEquals(-1, inMemorySystemAdmin.offsetComparator(null, 
"0").intValue());
+    assertEquals(1, inMemorySystemAdmin.offsetComparator("0", 
null).intValue());
+    assertEquals(-1, inMemorySystemAdmin.offsetComparator("0", 
"1").intValue());
+    assertEquals(0, inMemorySystemAdmin.offsetComparator("0", "0").intValue());
+    assertEquals(1, inMemorySystemAdmin.offsetComparator("1", "0").intValue());
+  }
+}
\ No newline at end of file
diff --git 
a/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystemConsumer.java
 
b/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystemConsumer.java
new file mode 100644
index 0000000..6278045
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystemConsumer.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.inmemory;
+
+import java.util.Map;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.samza.Partition;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.when;
+
+
+public class TestInMemorySystemConsumer {
+  private static final SystemStreamPartition SSP0 = new 
SystemStreamPartition("system", "stream", new Partition(0));
+  private static final SystemStreamPartition SSP1 = new 
SystemStreamPartition("system", "stream", new Partition(1));
+
+  @Mock
+  private InMemoryManager inMemoryManager;
+
+  private InMemorySystemConsumer inMemorySystemConsumer;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    this.inMemorySystemConsumer = new 
InMemorySystemConsumer(this.inMemoryManager);
+  }
+
+  @Test
+  public void testPoll() throws InterruptedException {
+    this.inMemorySystemConsumer.register(SSP0, "1");
+    this.inMemorySystemConsumer.register(SSP1, "1");
+
+    IncomingMessageEnvelope ime01 = new IncomingMessageEnvelope(SSP0, "1", 
"key01", "message01");
+    IncomingMessageEnvelope ime02 = new IncomingMessageEnvelope(SSP0, "2", 
"key02", "message02");
+    Map<SystemStreamPartition, String> pollRequest = ImmutableMap.of(SSP0, 
"1");
+    when(this.inMemoryManager.poll(pollRequest))
+        // poll for SSP0 only, return no messages
+        .thenReturn(ImmutableMap.of(SSP0, ImmutableList.of()))
+        // poll for SSP0 only, return some messages; still same offset request 
since got no messages last time
+        .thenReturn(ImmutableMap.of(SSP0, ImmutableList.of(ime01, ime02)));
+    // poll for SSP0 and SSP1; SSP0 should have a new offset now
+    pollRequest = ImmutableMap.of(SSP0, "3", SSP1, "1");
+    IncomingMessageEnvelope ime03 = new IncomingMessageEnvelope(SSP0, "3", 
"key03", "message03");
+    IncomingMessageEnvelope ime10 = new IncomingMessageEnvelope(SSP1, "1", 
"key10", "message10");
+    when(this.inMemoryManager.poll(pollRequest)).thenReturn(
+        ImmutableMap.of(SSP0, ImmutableList.of(ime03), SSP1, 
ImmutableList.of(ime10)));
+
+    assertEquals(ImmutableMap.of(SSP0, ImmutableList.of()),
+        this.inMemorySystemConsumer.poll(ImmutableSet.of(SSP0), 1000));
+    assertEquals(ImmutableMap.of(SSP0, ImmutableList.of(ime01, ime02)),
+        this.inMemorySystemConsumer.poll(ImmutableSet.of(SSP0), 1000));
+    assertEquals(ImmutableMap.of(SSP0, ImmutableList.of(ime03), SSP1, 
ImmutableList.of(ime10)),
+        this.inMemorySystemConsumer.poll(ImmutableSet.of(SSP0, SSP1), 1000));
+  }
+
+  @Test
+  public void testPollRegisterNullOffset() throws InterruptedException {
+    this.inMemorySystemConsumer.register(SSP0, null);
+
+    IncomingMessageEnvelope ime0 = new IncomingMessageEnvelope(SSP0, "0", 
"key0", "message0");
+    IncomingMessageEnvelope ime1 = new IncomingMessageEnvelope(SSP0, "1", 
"key1", "message1");
+    Map<SystemStreamPartition, String> pollRequest = ImmutableMap.of(SSP0, 
"0");
+    
when(this.inMemoryManager.poll(pollRequest)).thenReturn(ImmutableMap.of(SSP0, 
ImmutableList.of(ime0)));
+    pollRequest = ImmutableMap.of(SSP0, "1");
+    
when(this.inMemoryManager.poll(pollRequest)).thenReturn(ImmutableMap.of(SSP0, 
ImmutableList.of(ime1)));
+
+    assertEquals(ImmutableMap.of(SSP0, ImmutableList.of(ime0)),
+        this.inMemorySystemConsumer.poll(ImmutableSet.of(SSP0), 1000));
+    assertEquals(ImmutableMap.of(SSP0, ImmutableList.of(ime1)),
+        this.inMemorySystemConsumer.poll(ImmutableSet.of(SSP0), 1000));
+  }
+}
\ No newline at end of file
diff --git 
a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java 
b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
index 4d70017..082b727 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
@@ -339,10 +339,10 @@ public class TestRunner {
         SystemStreamPartition ssp = entry.getKey();
         output.computeIfAbsent(ssp, k -> new 
LinkedList<IncomingMessageEnvelope>());
         List<IncomingMessageEnvelope> currentBuffer = entry.getValue();
-        Integer totalMessagesToFetch = 
Integer.valueOf(metadata.get(outputDescriptor.getStreamId())
+        int totalMessagesToFetch = 
Integer.valueOf(metadata.get(outputDescriptor.getStreamId())
             .getSystemStreamPartitionMetadata()
             .get(ssp.getPartition())
-            .getNewestOffset());
+            .getUpcomingOffset());
         if (output.get(ssp).size() + currentBuffer.size() == 
totalMessagesToFetch) {
           didNotReachEndOfStream.remove(entry.getKey());
           ssps.remove(entry.getKey());

Reply via email to