This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c7ac08b [pulsar-broker] Stop to dispatch when skip message temporally
since Key_Shared consumer stuck on delivery (#7553)
c7ac08b is described below
commit c7ac08b13e95cd7bbbcf551d2112e9131153bd83
Author: Yuri Mizushima <[email protected]>
AuthorDate: Wed Sep 2 23:36:29 2020 +0900
[pulsar-broker] Stop to dispatch when skip message temporally since
Key_Shared consumer stuck on delivery (#7553)
### Motivation
In some case of Key_Shared consumer, messages ordering was broken.
Here is how to reproduce(I think it is one of case to reproduce this issue).
1. Connect Consumer1 to Key_Shared subscription `sub` and stop to receive
- receiverQueueSize: 500
2. Connect Producer and publish 500 messages with key `(i % 10)`
3. Connect Consumer2 to same subscription and start to receive
- receiverQueueSize: 1
- since https://github.com/apache/pulsar/pull/7106 , Consumer2 can't
receive (expected)
4. Producer publish more 500 messages with same key generation algorithm
5. After that, Consumer1 start to receive
6. Check Consumer2 message ordering
- sometimes message ordering was broken in same key
Consumer1:
```
Connected: Tue Jul 14 09:36:39 JST 2020
[pulsar-client-io-1-1] WARN com.scurrilous.circe.checksum.Crc32cIntChecksum
- Failed to load Circe JNI library. Falling back to Java based CRC32c provider
[pulsar-timer-4-1] INFO
org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl -
[persistent://public/default/key-shared-test] [sub0] [820f0] Prefetched
messages: 499 --- Consume throughput received: 0.02 msgs/s --- 0.00 Mbit/s ---
Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0
---Failed acks: 0
Received: my-message-0 PublishTime: 1594687006203 Date: Tue Jul 14 09:37:46
JST 2020
Received: my-message-1 PublishTime: 1594687006243 Date: Tue Jul 14 09:37:46
JST 2020
Received: my-message-2 PublishTime: 1594687006247 Date: Tue Jul 14 09:37:46
JST 2020
...
Received: my-message-498 PublishTime: 1594687008727 Date: Tue Jul 14
09:37:46 JST 2020
Received: my-message-499 PublishTime: 1594687008731 Date: Tue Jul 14
09:37:46 JST 2020
Received: my-message-500 PublishTime: 1594687038742 Date: Tue Jul 14
09:37:46 JST 2020
...
Received: my-message-990 PublishTime: 1594687040094 Date: Tue Jul 14
09:37:46 JST 2020
Received: my-message-994 PublishTime: 1594687040103 Date: Tue Jul 14
09:37:46 JST 2020
Received: my-message-995 PublishTime: 1594687040105 Date: Tue Jul 14
09:37:46 JST 2020
Received: my-message-997 PublishTime: 1594687040113 Date: Tue Jul 14
09:37:46 JST 2020
```
Consumer2:
```
Connected: Tue Jul 14 09:37:03 JST 2020
[pulsar-client-io-1-1] WARN com.scurrilous.circe.checksum.Crc32cIntChecksum
- Failed to load Circe JNI library. Falling back to Java based CRC32c provider
Received: my-message-501 MessageId: 4:1501:-1 PublishTime: 1594687038753
Date: Tue Jul 14 09:37:46 JST 2020
Received: my-message-502 MessageId: 4:1502:-1 PublishTime: 1594687038755
Date: Tue Jul 14 09:37:46 JST 2020
Received: my-message-503 MessageId: 4:1503:-1 PublishTime: 1594687038759
Date: Tue Jul 14 09:37:46 JST 2020
Received: my-message-506 MessageId: 4:1506:-1 PublishTime: 1594687038785
Date: Tue Jul 14 09:37:46 JST 2020
Received: my-message-508 MessageId: 4:1508:-1 PublishTime: 1594687038812
Date: Tue Jul 14 09:37:46 JST 2020
Received: my-message-901 MessageId: 4:1901:-1 PublishTime: 1594687039871
Date: Tue Jul 14 09:37:46 JST 2020
Received: my-message-509 MessageId: 4:1509:-1 PublishTime: 1594687038815
Date: Tue Jul 14 09:37:46 JST 2020
ordering was broken, key: 1 oldNum: 901 newNum: 511
Received: my-message-511 MessageId: 4:1511:-1 PublishTime: 1594687038826
Date: Tue Jul 14 09:37:46 JST 2020
Received: my-message-512 MessageId: 4:1512:-1 PublishTime: 1594687038830
Date: Tue Jul 14 09:37:46 JST 2020
...
```
I think this issue is caused by https://github.com/apache/pulsar/pull/7105.
Here is an example.
1. dispatch messages
2. Consumer2 was stuck and `totalMessagesSent=0`
- Consumer2 availablePermits was 0
3. skip redeliver messages temporally
- Consumer2 availablePermits was back to 1
4. dispatch new messages
- new message was dispatched to Consumer2
5. back to redeliver messages
4. dispatch messages
- ordering was broken
### Modifications
Stop to dispatch when skip message temporally since Key_Shared consumer
stuck on delivery.
---
...istentStickyKeyDispatcherMultipleConsumers.java | 26 ++++-
...ntStickyKeyDispatcherMultipleConsumersTest.java | 105 ++++++++++++++++++++-
2 files changed, 127 insertions(+), 4 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index a56f566..095f9ba 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -25,6 +25,7 @@ import io.netty.util.concurrent.FastThreadLocal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -64,12 +65,17 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
*/
private final Map<Consumer, PositionImpl> recentlyJoinedConsumers;
+ private final Set<Consumer> stuckConsumers;
+ private final Set<Consumer> nextStuckConsumers;
+
PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic,
ManagedCursor cursor,
Subscription subscription, ServiceConfiguration conf,
KeySharedMeta ksm) {
super(topic, cursor, subscription);
this.allowOutOfOrderDelivery = ksm.getAllowOutOfOrderDelivery();
this.recentlyJoinedConsumers = allowOutOfOrderDelivery ?
Collections.emptyMap() : new HashMap<>();
+ this.stuckConsumers = new HashSet<>();
+ this.nextStuckConsumers = new HashSet<>();
switch (ksm.getKeySharedMode()) {
case AUTO_SPLIT:
@@ -143,6 +149,8 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
return;
}
+ nextStuckConsumers.clear();
+
final Map<Consumer, List<Entry>> groupedEntries =
localGroupedEntries.get();
groupedEntries.clear();
@@ -217,11 +225,14 @@ public class
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
}
}
+ stuckConsumers.clear();
+
if (totalMessagesSent == 0 && recentlyJoinedConsumers.isEmpty()) {
// This means, that all the messages we've just read cannot be
dispatched right now.
// This condition can only happen when:
// 1. We have consumers ready to accept messages (otherwise the
would not haven been triggered)
// 2. All keys in the current set of messages are routing to
consumers that are currently busy
+ // and stuck is not caused by stuckConsumers
//
// The solution here is to move on and read next batch of messages
which might hopefully contain
// also keys meant for other consumers.
@@ -230,18 +241,31 @@ public class
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
// ahead in the stream while the new consumers are not ready to
accept the new messages,
// therefore would be most likely only increase the distance
between read-position and mark-delete
// position.
- isDispatcherStuckOnReplays = true;
+ if (!nextStuckConsumers.isEmpty()) {
+ isDispatcherStuckOnReplays = true;
+ stuckConsumers.addAll(nextStuckConsumers);
+ }
+ // readMoreEntries should run regardless whether or not stuck is
caused by stuckConsumers for avoid stopping dispatch.
readMoreEntries();
}
}
private int getRestrictedMaxEntriesForConsumer(Consumer consumer,
List<Entry> entries, int maxMessages) {
if (maxMessages == 0) {
+ // the consumer was stuck
+ nextStuckConsumers.add(consumer);
return 0;
}
PositionImpl maxReadPosition = recentlyJoinedConsumers.get(consumer);
if (maxReadPosition == null) {
+ // stop to dispatch by stuckConsumers
+ if (stuckConsumers.contains(consumer)) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] stop to dispatch by stuckConsumers,
consumer: {}", name, consumer);
+ }
+ return 0;
+ }
// The consumer has not recently joined, so we can send all
messages
return maxMessages;
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index 6325f10..c281400 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -41,6 +41,7 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.ObjectFactory;
import org.testng.annotations.Test;
+import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
@@ -49,6 +50,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static
org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
+import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
@PrepareForTest({ DispatchRateLimiter.class })
@@ -79,6 +81,8 @@ public class
PersistentStickyKeyDispatcherMultipleConsumersTest {
configMock = mock(ServiceConfiguration.class);
doReturn(true).when(configMock).isSubscriptionRedeliveryTrackerEnabled();
doReturn(100).when(configMock).getDispatcherMaxReadBatchSize();
+
doReturn(true).when(configMock).isSubscriptionKeySharedUseConsistentHashing();
+
doReturn(1).when(configMock).getSubscriptionKeySharedConsistentHashingReplicaPoints();
pulsarMock = mock(PulsarService.class);
doReturn(configMock).when(pulsarMock).getConfiguration();
@@ -96,6 +100,7 @@ public class
PersistentStickyKeyDispatcherMultipleConsumersTest {
consumerMock = mock(Consumer.class);
channelMock = mock(ChannelPromise.class);
+ doReturn("consumer1").when(consumerMock).consumerName();
doReturn(1000).when(consumerMock).getAvailablePermits();
doReturn(true).when(consumerMock).isWritable();
doReturn(channelMock).when(consumerMock).sendMessages(
@@ -120,12 +125,17 @@ public class
PersistentStickyKeyDispatcherMultipleConsumersTest {
persistentDispatcher = new
PersistentStickyKeyDispatcherMultipleConsumers(
topicMock, cursorMock, subscriptionMock, configMock,
KeySharedMeta.getDefaultInstance());
- persistentDispatcher.addConsumer(consumerMock);
- persistentDispatcher.consumerFlow(consumerMock, 1000);
}
@Test
public void testSendMarkerMessage() {
+ try {
+ persistentDispatcher.addConsumer(consumerMock);
+ persistentDispatcher.consumerFlow(consumerMock, 1000);
+ } catch (Exception e) {
+ fail("Failed to add mock consumer", e);
+ }
+
List<Entry> entries = new ArrayList<>();
ByteBuf markerMessage =
Markers.newReplicatedSubscriptionsSnapshotRequest("testSnapshotId",
"testSourceCluster");
entries.add(EntryImpl.create(1, 1, markerMessage));
@@ -156,11 +166,100 @@ public class
PersistentStickyKeyDispatcherMultipleConsumersTest {
Assert.assertEquals(allTotalMessagesCaptor.get(0).intValue(), 5);
}
+ @Test
+ public void testSkipRedeliverTemporally() {
+ final Consumer slowConsumerMock = mock(Consumer.class);
+ final ChannelPromise slowChannelMock = mock(ChannelPromise.class);
+ // add entries to redeliver and read target
+ final List<Entry> redeliverEntries = new ArrayList<>();
+ redeliverEntries.add(EntryImpl.create(1, 1, createMessage("message1",
1, "key1")));
+ final List<Entry> readEntries = new ArrayList<>();
+ readEntries.add(EntryImpl.create(1, 2, createMessage("message2", 2,
"key1")));
+ readEntries.add(EntryImpl.create(1, 3, createMessage("message3", 3,
"key2")));
+
+ try {
+ Field totalAvailablePermitsField =
PersistentDispatcherMultipleConsumers.class.getDeclaredField("totalAvailablePermits");
+ totalAvailablePermitsField.setAccessible(true);
+ totalAvailablePermitsField.set(persistentDispatcher, 1000);
+
+ doAnswer(invocationOnMock -> {
+ ((PersistentStickyKeyDispatcherMultipleConsumers)
invocationOnMock.getArgument(2))
+ .readEntriesComplete(readEntries,
PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal);
+ return null;
+ }).when(cursorMock).asyncReadEntriesOrWait(
+ anyInt(), anyLong(),
any(PersistentStickyKeyDispatcherMultipleConsumers.class),
+
eq(PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal));
+ } catch (Exception e) {
+ fail("Failed to set to field", e);
+ }
+
+ // Create 2Consumers
+ try {
+ doReturn("consumer2").when(slowConsumerMock).consumerName();
+ // Change slowConsumer availablePermits to 0 and back to normal
+ when(slowConsumerMock.getAvailablePermits())
+ .thenReturn(0)
+ .thenReturn(1);
+ doReturn(true).when(slowConsumerMock).isWritable();
+ doReturn(slowChannelMock).when(slowConsumerMock).sendMessages(
+ anyList(),
+ any(EntryBatchSizes.class),
+ any(EntryBatchIndexesAcks.class),
+ anyInt(),
+ anyLong(),
+ anyLong(),
+ any(RedeliveryTracker.class)
+ );
+
+ persistentDispatcher.addConsumer(consumerMock);
+ persistentDispatcher.addConsumer(slowConsumerMock);
+ } catch (Exception e) {
+ fail("Failed to add mock consumer", e);
+ }
+
+ // run
PersistentStickyKeyDispatcherMultipleConsumers#sendMessagesToConsumers
+ // run readMoreEntries internally (and skip internally)
+ // Change slowConsumer availablePermits to 1
+ // run
PersistentStickyKeyDispatcherMultipleConsumers#sendMessagesToConsumers
internally
+ // and then stop to dispatch to slowConsumer
+
persistentDispatcher.sendMessagesToConsumers(PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal,
redeliverEntries);
+
+ verify(consumerMock, times(1)).sendMessages(
+ argThat(arg -> {
+ assertEquals(arg.size(), 1);
+ Entry entry = arg.get(0);
+ assertEquals(entry.getLedgerId(), 1);
+ assertEquals(entry.getEntryId(), 3);
+ return true;
+ }),
+ any(EntryBatchSizes.class),
+ any(EntryBatchIndexesAcks.class),
+ anyInt(),
+ anyLong(),
+ anyLong(),
+ any(RedeliveryTracker.class)
+ );
+ verify(slowConsumerMock, times(0)).sendMessages(
+ anyList(),
+ any(EntryBatchSizes.class),
+ any(EntryBatchIndexesAcks.class),
+ anyInt(),
+ anyLong(),
+ anyLong(),
+ any(RedeliveryTracker.class)
+ );
+ }
+
private ByteBuf createMessage(String message, int sequenceId) {
+ return createMessage(message, sequenceId, "testKey");
+ }
+
+ private ByteBuf createMessage(String message, int sequenceId, String key) {
PulsarApi.MessageMetadata.Builder messageMetadata =
PulsarApi.MessageMetadata.newBuilder();
messageMetadata.setSequenceId(sequenceId);
messageMetadata.setProducerName("testProducer");
- messageMetadata.setPartitionKey("testKey");
+ messageMetadata.setPartitionKey(key);
+ messageMetadata.setPartitionKeyB64Encoded(false);
messageMetadata.setPublishTime(System.currentTimeMillis());
return serializeMetadataAndPayload(Commands.ChecksumType.Crc32c,
messageMetadata.build(), Unpooled.copiedBuffer(message.getBytes(UTF_8)));
}