This is an automated email from the ASF dual-hosted git repository.
cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 37de270 SAMZA-2297: InMemorySystemAdmin offsets are off-by-one in
some cases (#1133)
37de270 is described below
commit 37de270bce27defca691891a93517c8d694057c8
Author: Cameron Lee <[email protected]>
AuthorDate: Mon Aug 12 17:39:54 2019 -0700
SAMZA-2297: InMemorySystemAdmin offsets are off-by-one in some cases
(#1133)
---
.../samza/system/inmemory/InMemoryManager.java | 7 +-
.../samza/system/inmemory/InMemorySystemAdmin.java | 12 +-
.../system/inmemory/InMemorySystemConsumer.java | 21 +++-
.../samza/system/inmemory/TestInMemoryManager.java | 133 +++++++++++++++++++++
.../system/inmemory/TestInMemorySystemAdmin.java | 50 ++++++++
.../inmemory/TestInMemorySystemConsumer.java | 97 +++++++++++++++
.../apache/samza/test/framework/TestRunner.java | 4 +-
7 files changed, 312 insertions(+), 12 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
index 2a32583..f53e0b3 100644
---
a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
+++
b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
@@ -179,9 +179,10 @@ class InMemoryManager {
.entrySet()
.stream()
.collect(Collectors.toMap(entry -> entry.getKey().getPartition(),
entry -> {
- String oldestOffset = "0";
- String newestOffset = String.valueOf(entry.getValue().size());
- String upcomingOffset = String.valueOf(entry.getValue().size()
+ 1);
+ List<IncomingMessageEnvelope> messages = entry.getValue();
+ String oldestOffset = messages.isEmpty() ? null : "0";
+ String newestOffset = messages.isEmpty() ? null :
String.valueOf(messages.size() - 1);
+ String upcomingOffset = String.valueOf(messages.size());
return new
SystemStreamMetadata.SystemStreamPartitionMetadata(oldestOffset, newestOffset,
upcomingOffset);
diff --git
a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemAdmin.java
b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemAdmin.java
index 8e5f732..cb5478c 100644
---
a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemAdmin.java
+++
b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemAdmin.java
@@ -90,11 +90,15 @@ public class InMemorySystemAdmin implements SystemAdmin {
*/
@Override
public Integer offsetComparator(String offset1, String offset2) {
- if (offset1 == null || offset2 == null) {
- return null;
+ if (offset1 == null && offset2 == null) {
+ return 0;
+ } else if (offset1 == null) {
+ return -1;
+ } else if (offset2 == null) {
+ return 1;
+ } else {
+ return Integer.compare(Integer.parseInt(offset1),
Integer.parseInt(offset2));
}
-
- return Integer.compare(Integer.parseInt(offset1),
Integer.parseInt(offset2));
}
/**
diff --git
a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemConsumer.java
b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemConsumer.java
index dcab001..2c02b79 100644
---
a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemConsumer.java
+++
b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemConsumer.java
@@ -65,7 +65,10 @@ public class InMemorySystemConsumer implements
SystemConsumer {
* should try and read messages from all SystemStreamPartitions that are
* registered to it. SystemStreamPartitions should only be registered before
* start is called.
- * @param systemStreamPartition
+ *
+ * For this implementation, if the offset is null, then consumption will
start with the oldest offset.
+ *
+ * @param systemStreamPartition
* The SystemStreamPartition object representing the Samza
* SystemStreamPartition to receive messages from.
* @param offset
@@ -74,11 +77,23 @@ public class InMemorySystemConsumer implements
SystemConsumer {
* specified, the first message for the system/stream/partition to
be
* consumed and returned would be a message whose offset is "7".
* Note: For broadcast streams, different tasks may checkpoint the
same ssp with different values. It
+ * is the system's responsibility to select the lowest one.
*/
@Override
public void register(SystemStreamPartition systemStreamPartition, String
offset) {
- LOG.info("Registering ssp {} with starting offset {}",
systemStreamPartition, offset);
- sspToOffset.put(systemStreamPartition, offset);
+ String offsetToRegister;
+ if (offset == null) {
+ /*
+ * A null offset is the same as the oldest offset of the stream. Can't
use null directly since ConcurrentHashMap
+ * doesn't allow putting null values.
+ */
+ LOG.info("Registering ssp {} with starting offset null, overriding to
0", systemStreamPartition);
+ offsetToRegister = "0";
+ } else {
+ LOG.info("Registering ssp {} with starting offset {}",
systemStreamPartition, offset);
+ offsetToRegister = offset;
+ }
+ sspToOffset.put(systemStreamPartition, offsetToRegister);
}
/**
diff --git
a/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemoryManager.java
b/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemoryManager.java
new file mode 100644
index 0000000..7a32483
--- /dev/null
+++
b/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemoryManager.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.inmemory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.samza.Partition;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class TestInMemoryManager {
+ private static final String SYSTEM = "system";
+ private static final String STREAM0 = "stream0";
+ private static final String STREAM1 = "stream1";
+
+ private InMemoryManager inMemoryManager;
+
+ @Before
+ public void setup() {
+ this.inMemoryManager = new InMemoryManager();
+ }
+
+ @Test
+ public void testGetSystemStreamMetadata() {
+ this.inMemoryManager.initializeStream(new StreamSpec(STREAM0, STREAM0,
SYSTEM, 1));
+ this.inMemoryManager.initializeStream(new StreamSpec(STREAM1, STREAM1,
SYSTEM, 1));
+ // add some other stream which we won't request metadata for
+ this.inMemoryManager.initializeStream(new StreamSpec("otherStream",
"otherStream", SYSTEM, 1));
+
+ // empty stream
+ SystemStreamMetadata systemStreamMetadata0 = new
SystemStreamMetadata(STREAM0,
+ ImmutableMap.of(new Partition(0), new
SystemStreamMetadata.SystemStreamPartitionMetadata(null, null, "0")));
+ assertEquals(ImmutableMap.of(STREAM0, systemStreamMetadata0),
+ this.inMemoryManager.getSystemStreamMetadata(SYSTEM,
ImmutableSet.of(STREAM0)));
+
+ // add a message in
+ SystemStreamPartition ssp0 = new SystemStreamPartition(SYSTEM, STREAM0,
new Partition(0));
+ this.inMemoryManager.put(ssp0, "key00", "message00");
+ systemStreamMetadata0 = new SystemStreamMetadata(STREAM0,
+ ImmutableMap.of(new Partition(0), new
SystemStreamMetadata.SystemStreamPartitionMetadata("0", "0", "1")));
+ assertEquals(ImmutableMap.of(STREAM0, systemStreamMetadata0),
+ this.inMemoryManager.getSystemStreamMetadata(SYSTEM,
ImmutableSet.of(STREAM0)));
+
+ // add a second message to the first stream and add one message to the
second stream
+ this.inMemoryManager.put(ssp0, "key01", "message01");
+ SystemStreamPartition ssp1 = new SystemStreamPartition(SYSTEM, STREAM1,
new Partition(0));
+ this.inMemoryManager.put(ssp1, "key10", "message10");
+ systemStreamMetadata0 = new SystemStreamMetadata(STREAM0,
+ ImmutableMap.of(new Partition(0), new
SystemStreamMetadata.SystemStreamPartitionMetadata("0", "1", "2")));
+ SystemStreamMetadata systemStreamMetadata1 = new
SystemStreamMetadata(STREAM1,
+ ImmutableMap.of(new Partition(0), new
SystemStreamMetadata.SystemStreamPartitionMetadata("0", "0", "1")));
+ // also test a batch call for multiple streams here
+ assertEquals(ImmutableMap.of(STREAM0, systemStreamMetadata0, STREAM1,
systemStreamMetadata1),
+ this.inMemoryManager.getSystemStreamMetadata(SYSTEM,
ImmutableSet.of(STREAM0, STREAM1)));
+ }
+
+ @Test
+ public void testPoll() {
+ this.inMemoryManager.initializeStream(new StreamSpec(STREAM0, STREAM0,
SYSTEM, 1));
+ this.inMemoryManager.initializeStream(new StreamSpec(STREAM1, STREAM1,
SYSTEM, 1));
+ // add some other stream which we won't request metadata for
+ this.inMemoryManager.initializeStream(new StreamSpec("otherStream",
"otherStream", SYSTEM, 1));
+
+ // empty stream
+ SystemStreamPartition ssp0 = new SystemStreamPartition(SYSTEM, STREAM0,
new Partition(0));
+ assertEquals(ImmutableMap.of(ssp0, ImmutableList.of()),
+ this.inMemoryManager.poll(Collections.singletonMap(ssp0, "0")));
+
+ // add a message in
+ this.inMemoryManager.put(ssp0, "key00", "message00");
+ Map<SystemStreamPartition, List<IncomingMessageEnvelope>> polledMessages =
+ this.inMemoryManager.poll(Collections.singletonMap(ssp0, "0"));
+ assertEquals(1, polledMessages.get(ssp0).size());
+ assertIncomingMessageEnvelope("key00", "message00", "0", ssp0,
polledMessages.get(ssp0).get(0));
+
+ // add a second message to the first stream
+ this.inMemoryManager.put(ssp0, "key01", "message01");
+ // verify multiple messages returned
+ polledMessages = this.inMemoryManager.poll(ImmutableMap.of(ssp0, "0"));
+ assertEquals(2, polledMessages.get(ssp0).size());
+ assertIncomingMessageEnvelope("key00", "message00", "0", ssp0,
polledMessages.get(ssp0).get(0));
+ assertIncomingMessageEnvelope("key01", "message01", "1", ssp0,
polledMessages.get(ssp0).get(1));
+ // make sure only read messages starting from the offset that is not the
oldest offset
+ polledMessages = this.inMemoryManager.poll(ImmutableMap.of(ssp0, "1"));
+ assertEquals(1, polledMessages.get(ssp0).size());
+ assertIncomingMessageEnvelope("key01", "message01", "1", ssp0,
polledMessages.get(ssp0).get(0));
+
+ // add a message to the second stream to test a batch call
+ SystemStreamPartition ssp1 = new SystemStreamPartition(SYSTEM, STREAM1,
new Partition(0));
+ this.inMemoryManager.put(ssp1, "key10", "message10");
+ polledMessages = this.inMemoryManager.poll(ImmutableMap.of(ssp0, "1",
ssp1, "0"));
+ assertEquals(1, polledMessages.get(ssp0).size());
+ assertIncomingMessageEnvelope("key01", "message01", "1", ssp0,
polledMessages.get(ssp0).get(0));
+ assertEquals(1, polledMessages.get(ssp1).size());
+ assertIncomingMessageEnvelope("key10", "message10", "0", ssp1,
polledMessages.get(ssp1).get(0));
+ }
+
+ private static void assertIncomingMessageEnvelope(String expectedKey, String
expectedMessage, String expectedOffset,
+ SystemStreamPartition expectedSystemStreamPartition,
IncomingMessageEnvelope actualIncomingMessageEnvelope) {
+ assertEquals(expectedKey, actualIncomingMessageEnvelope.getKey());
+ assertEquals(expectedMessage, actualIncomingMessageEnvelope.getMessage());
+ assertEquals(expectedOffset, actualIncomingMessageEnvelope.getOffset());
+ assertEquals(expectedSystemStreamPartition,
actualIncomingMessageEnvelope.getSystemStreamPartition());
+ }
+}
\ No newline at end of file
diff --git
a/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystemAdmin.java
b/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystemAdmin.java
new file mode 100644
index 0000000..da774c4
--- /dev/null
+++
b/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystemAdmin.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.inmemory;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class TestInMemorySystemAdmin {
+ @Mock
+ private InMemoryManager inMemoryManager;
+
+ private InMemorySystemAdmin inMemorySystemAdmin;
+
+ @Before
+ public void setup() {
+ MockitoAnnotations.initMocks(this);
+ this.inMemorySystemAdmin = new InMemorySystemAdmin("system",
this.inMemoryManager);
+ }
+
+ @Test
+ public void testOffsetComparator() {
+ assertEquals(0, inMemorySystemAdmin.offsetComparator(null,
null).intValue());
+ assertEquals(-1, inMemorySystemAdmin.offsetComparator(null,
"0").intValue());
+ assertEquals(1, inMemorySystemAdmin.offsetComparator("0",
null).intValue());
+ assertEquals(-1, inMemorySystemAdmin.offsetComparator("0",
"1").intValue());
+ assertEquals(0, inMemorySystemAdmin.offsetComparator("0", "0").intValue());
+ assertEquals(1, inMemorySystemAdmin.offsetComparator("1", "0").intValue());
+ }
+}
\ No newline at end of file
diff --git
a/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystemConsumer.java
b/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystemConsumer.java
new file mode 100644
index 0000000..6278045
--- /dev/null
+++
b/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystemConsumer.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.inmemory;
+
+import java.util.Map;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.samza.Partition;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.when;
+
+
+public class TestInMemorySystemConsumer {
+ private static final SystemStreamPartition SSP0 = new
SystemStreamPartition("system", "stream", new Partition(0));
+ private static final SystemStreamPartition SSP1 = new
SystemStreamPartition("system", "stream", new Partition(1));
+
+ @Mock
+ private InMemoryManager inMemoryManager;
+
+ private InMemorySystemConsumer inMemorySystemConsumer;
+
+ @Before
+ public void setup() {
+ MockitoAnnotations.initMocks(this);
+ this.inMemorySystemConsumer = new
InMemorySystemConsumer(this.inMemoryManager);
+ }
+
+ @Test
+ public void testPoll() throws InterruptedException {
+ this.inMemorySystemConsumer.register(SSP0, "1");
+ this.inMemorySystemConsumer.register(SSP1, "1");
+
+ IncomingMessageEnvelope ime01 = new IncomingMessageEnvelope(SSP0, "1",
"key01", "message01");
+ IncomingMessageEnvelope ime02 = new IncomingMessageEnvelope(SSP0, "2",
"key02", "message02");
+ Map<SystemStreamPartition, String> pollRequest = ImmutableMap.of(SSP0,
"1");
+ when(this.inMemoryManager.poll(pollRequest))
+ // poll for SSP0 only, return no messages
+ .thenReturn(ImmutableMap.of(SSP0, ImmutableList.of()))
+ // poll for SSP0 only, return some messages; still same offset request
since got no messages last time
+ .thenReturn(ImmutableMap.of(SSP0, ImmutableList.of(ime01, ime02)));
+ // poll for SSP0 and SSP1; SSP0 should have a new offset now
+ pollRequest = ImmutableMap.of(SSP0, "3", SSP1, "1");
+ IncomingMessageEnvelope ime03 = new IncomingMessageEnvelope(SSP0, "3",
"key03", "message03");
+ IncomingMessageEnvelope ime10 = new IncomingMessageEnvelope(SSP1, "1",
"key10", "message10");
+ when(this.inMemoryManager.poll(pollRequest)).thenReturn(
+ ImmutableMap.of(SSP0, ImmutableList.of(ime03), SSP1,
ImmutableList.of(ime10)));
+
+ assertEquals(ImmutableMap.of(SSP0, ImmutableList.of()),
+ this.inMemorySystemConsumer.poll(ImmutableSet.of(SSP0), 1000));
+ assertEquals(ImmutableMap.of(SSP0, ImmutableList.of(ime01, ime02)),
+ this.inMemorySystemConsumer.poll(ImmutableSet.of(SSP0), 1000));
+ assertEquals(ImmutableMap.of(SSP0, ImmutableList.of(ime03), SSP1,
ImmutableList.of(ime10)),
+ this.inMemorySystemConsumer.poll(ImmutableSet.of(SSP0, SSP1), 1000));
+ }
+
+ @Test
+ public void testPollRegisterNullOffset() throws InterruptedException {
+ this.inMemorySystemConsumer.register(SSP0, null);
+
+ IncomingMessageEnvelope ime0 = new IncomingMessageEnvelope(SSP0, "0",
"key0", "message0");
+ IncomingMessageEnvelope ime1 = new IncomingMessageEnvelope(SSP0, "1",
"key1", "message1");
+ Map<SystemStreamPartition, String> pollRequest = ImmutableMap.of(SSP0,
"0");
+
when(this.inMemoryManager.poll(pollRequest)).thenReturn(ImmutableMap.of(SSP0,
ImmutableList.of(ime0)));
+ pollRequest = ImmutableMap.of(SSP0, "1");
+
when(this.inMemoryManager.poll(pollRequest)).thenReturn(ImmutableMap.of(SSP0,
ImmutableList.of(ime1)));
+
+ assertEquals(ImmutableMap.of(SSP0, ImmutableList.of(ime0)),
+ this.inMemorySystemConsumer.poll(ImmutableSet.of(SSP0), 1000));
+ assertEquals(ImmutableMap.of(SSP0, ImmutableList.of(ime1)),
+ this.inMemorySystemConsumer.poll(ImmutableSet.of(SSP0), 1000));
+ }
+}
\ No newline at end of file
diff --git
a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
index 4d70017..082b727 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
@@ -339,10 +339,10 @@ public class TestRunner {
SystemStreamPartition ssp = entry.getKey();
output.computeIfAbsent(ssp, k -> new
LinkedList<IncomingMessageEnvelope>());
List<IncomingMessageEnvelope> currentBuffer = entry.getValue();
- Integer totalMessagesToFetch =
Integer.valueOf(metadata.get(outputDescriptor.getStreamId())
+ int totalMessagesToFetch =
Integer.valueOf(metadata.get(outputDescriptor.getStreamId())
.getSystemStreamPartitionMetadata()
.get(ssp.getPartition())
- .getNewestOffset());
+ .getUpcomingOffset());
if (output.get(ssp).size() + currentBuffer.size() ==
totalMessagesToFetch) {
didNotReachEndOfStream.remove(entry.getKey());
ssps.remove(entry.getKey());