equanz commented on code in PR #20179:
URL: https://github.com/apache/pulsar/pull/20179#discussion_r1201975192
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##########
@@ -65,23 +67,22 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
private final KeySharedMode keySharedMode;
/**
- * When a consumer joins, it will be added to this map with the current
read position.
- * This means that, in order to preserve ordering, new consumers can only
receive old
- * messages, until the mark-delete position will move past this point.
+ * When a consumer joins, it will be added to this map with the current
last sent position per the message key.
+ * This means that, in order to preserve ordering per the message key, new
consumers can only receive old
+ * messages, until the mark-delete position will move past this point in
the key. New consumers can receive
+ * any new messages with the message key that is not in the last sent
position.
*/
- private final LinkedHashMap<Consumer, PositionImpl>
recentlyJoinedConsumers;
-
- private final Set<Consumer> stuckConsumers;
- private final Set<Consumer> nextStuckConsumers;
+ private final LinkedHashMap<Consumer, LastSentPositions>
recentlyJoinedConsumers;
+ // The lastSentPosition is not thread-safe
+ private final LastSentPositions lastSentPositions;
PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic,
ManagedCursor cursor,
Review Comment:
@poorbarcode
I understand.
However, I considered your approach, and then I found another issue. This
issue is not specific to the approach you have proposed. It just means the read
position is not suitable for recently joined consumers.
### Details of the issue
Consider the following flow.
1. Assume that,
readPosition: `1:1`
messagesToRedeliver: []
recentlyJoinedConsumers []
c1: messagesForC: 1, pending: []
c2: messagesForC: 1000, pending: []
selector: key-a: c1, key-b: c2
2. Dispatch m1 (key: `key-a`, position: `1:1`, type: Normal)
readPosition: `1:2`
messagesToRedeliver: []
recentlyJoinedConsumers []
c1: messagesForC: 0, pending: [m1]
c2: messagesForC: 1000, pending: []
selector: key-a: c1, key-b: c2
3. Dispatch m2 (key: `key-b`, position: `1:2`, type: Normal)
readPosition: `1:3`
messagesToRedeliver: []
recentlyJoinedConsumers []
c1: messagesForC: 0, pending: [m1]
c2: messagesForC: 999, pending: [m2]
selector: key-a: c1, key-b: c2
4. Dispatch m3 (key: `key-a`, position: `1:3`, type: Normal)
readPosition: `1:4`
messagesToRedeliver: [m3]
recentlyJoinedConsumers []
c1: messagesForC: 0, pending: [m1]
c2: messagesForC: 999, pending: [m2]
selector: key-a: c1, key-b: c2
5. Dispatch m4 (key: `key-b`, position: `1:4`, type: Normal) because
PersistentStickyKeyDispatcherMultipleConsumers has [the
isDispatcherStuckOnReplays
feature](https://github.com/apache/pulsar/blob/b7f0004313ea4565717cc6d3c0b99aee5c079c6c/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L429-L434).
readPosition: `1:5`
markDeletePosition: `1:0` (mock)
messagesToRedeliver: [m3]
recentlyJoinedConsumers []
c1: messagesForC: 0, pending: [m1]
c2: messagesForC: 998, pending: [m2, m4]
selector: key-a: c1, key-b: c2
6. Add consumer c3
readPosition: `1:5`
messagesToRedeliver: [m3]
recentlyJoinedConsumers [c3: `1:5`]
c1: messagesForC: 0, pending: [m1]
c2: messagesForC: 998, pending: [m2, m4]
c3: messagesForC: 1000, pending: []
selector: key-a: c1, key-b: c2
7. Dispatch m3 (key: `key-a`, position: `1:3`, type: Replay)
readPosition: `1:5`
messagesToRedeliver: []
recentlyJoinedConsumers [c3: `1:5`]
c1: messagesForC: 0, pending: [m1]
c2: messagesForC: 998, pending: [m2, m4]
c3: messagesForC: 999, pending: [m3]
selector: key-a: c3, key-b: c2
8. Disconnect c1 and redelivery m1
readPosition: `1:5`
messagesToRedeliver: []
recentlyJoinedConsumers [c3: `1:5`]
c2: messagesForC: 998, pending: [m2, m4]
c3: messagesForC: 998, pending: [m3, m1] // out of order
selector: key-a: c3, key-b: c2
I tested this issue by following the test case.
Base commit hash:
https://github.com/apache/pulsar/commit/b7f0004313ea4565717cc6d3c0b99aee5c079c6c
```java
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 18fb141be31..66652eec73e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -20,12 +20,15 @@ package org.apache.pulsar.client.api;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import java.io.IOException;
+import java.lang.reflect.Field;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
@@ -35,6 +38,8 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
@@ -52,13 +57,17 @@ import lombok.Cleanup;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.Topic;
import
org.apache.pulsar.broker.service.nonpersistent.NonPersistentStickyKeyDispatcherMultipleConsumers;
+import
org.apache.pulsar.broker.service.persistent.MessageRedeliveryController;
+import
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import
org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.api.proto.KeySharedMode;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
@@ -1191,6 +1200,86 @@ public class KeySharedSubscriptionTest extends
ProducerConsumerBase {
l.await();
}
+ @Test(timeOut = 30_000)
+ public void testCheckBetweenSkippingAndRecentlyJoinedConsumers() throws
Exception {
+ conf.setSubscriptionKeySharedUseConsistentHashing(true);
+ conf.setSubscriptionKeySharedConsistentHashingReplicaPoints(100);
+
+ final String topicName =
"persistent://public/default/recently-joined-consumers-" + UUID.randomUUID();
+ final String subName = "my-sub";
+
+ final ConsumerBuilder<String> cb =
pulsarClient.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Key_Shared);
+
+ // create 2 consumers
+ @Cleanup
+ final Consumer<String> c1 =
cb.consumerName("c1").receiverQueueSize(1).subscribe();
+ @Cleanup
+ final Consumer<String> c2 =
cb.consumerName("c2").receiverQueueSize(1000).subscribe();
+
+ @Cleanup
+ final Producer<String> p = pulsarClient.newProducer(Schema.STRING)
+ .topic(topicName)
+ .create();
+
+ final Set<String> c1Keys = Set.of("1", "3", "4", "5", "9");
+ final Set<String> c2Keys = Set.of("0", "2", "6", "7", "8");
+
+ // send and receive
+ Awaitility.await().untilAsserted(() ->
assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName).getConsumers().stream().filter(c
->
c.getConsumerName().equals(c1.getConsumerName())).findFirst().get().getAvailablePermits(),
1));
+ p.newMessage().key(Integer.toString(1)).value("msg-0").send();
+ Awaitility.await().untilAsserted(() ->
assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName).getConsumers().stream().filter(c
->
c.getConsumerName().equals(c1.getConsumerName())).findFirst().get().getAvailablePermits(),
0));
+
+ Awaitility.await().untilAsserted(() ->
assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName).getConsumers().stream().filter(c
->
c.getConsumerName().equals(c2.getConsumerName())).findFirst().get().getAvailablePermits(),
1000));
+ p.newMessage().key(Integer.toString(0)).value("msg-1").send();
+ Awaitility.await().untilAsserted(() ->
assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName).getConsumers().stream().filter(c
->
c.getConsumerName().equals(c2.getConsumerName())).findFirst().get().getAvailablePermits(),
999));
+
+ final MessageIdImpl msg2Id = (MessageIdImpl)
p.newMessage().key(Integer.toString(1)).value("msg-2").send();
+ p.newMessage().key(Integer.toString(0)).value("msg-3").send();
+ Awaitility.await().untilAsserted(() ->
assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName).getConsumers().stream().filter(c
->
c.getConsumerName().equals(c2.getConsumerName())).findFirst().get().getAvailablePermits(),
998));
+ Awaitility.await().untilAsserted(() ->
assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName).getMsgBacklog(),
4));
+ log.error("DEBUG: readPosition: {}",
admin.topics().getInternalStats(topicName).cursors.get(Codec.encode(subName)).readPosition);
+
+ final PersistentStickyKeyDispatcherMultipleConsumers dispatcher =
+ (PersistentStickyKeyDispatcherMultipleConsumers)
pulsar.getBrokerService().getTopicIfExists(topicName).get().get().getSubscription(subName).getDispatcher();
+ final Field redeliveryMessagesField =
PersistentDispatcherMultipleConsumers.class
+ .getDeclaredField("redeliveryMessages");
+ redeliveryMessagesField.setAccessible(true);
+ final MessageRedeliveryController redeliveryMessages =
(MessageRedeliveryController) redeliveryMessagesField.get(dispatcher);
+
+ final NavigableSet<PositionImpl> replayMsgSet =
redeliveryMessages.getMessagesToReplayNow(4);
+ assertEquals(replayMsgSet.size(), 1);
+ final PositionImpl replayMsg = replayMsgSet.first();
+ assertEquals(replayMsg.compareTo(msg2Id.getLedgerId(),
msg2Id.getEntryId()), 0);
+ log.error("DEBUG: replayMsgId: {}", replayMsg);
+
+ // add c3
+ @Cleanup
+ final Consumer<String> c3 = cb.consumerName("c3").subscribe();
+ final List<Message<String>> c3Msgs = new ArrayList<>();
+ log.error("DEBUG: recentlyJoinedConsumers: {}",
admin.topics().getStats(topicName).getSubscriptions().get(subName).getConsumersAfterMarkDeletePosition());
+
+ final Set<String> c3Keys = Set.of("1", "2", "6", "7");
+
+ Message<String> msg3 = c3.receive(100, TimeUnit.MILLISECONDS);
+ assertNotNull(msg3);
+ c3Msgs.add(msg3);
+
+ // disconnect
+ c1.close();
+
+ msg3 = c3.receive(100, TimeUnit.MILLISECONDS);
+ assertNotNull(msg3);
+ c3Msgs.add(msg3);
+
+ // check out-of-order
+
assertTrue(c3Msgs.get(0).getMessageId().compareTo(c3Msgs.get(1).getMessageId())
> 0);
+ log.error("DEBUG: c3Msgs messageId: {}",
c3Msgs.stream().map(Message::getMessageId).toList());
+ log.error("DEBUG: readPosition: {}",
admin.topics().getInternalStats(topicName).cursors.get(Codec.encode(subName)).readPosition);
+ log.error("DEBUG: recentlyJoinedConsumers: {}",
admin.topics().getStats(topicName).getSubscriptions().get(subName).getConsumersAfterMarkDeletePosition());
+ }
private KeySharedMode getKeySharedModeOfSubscription(Topic topic,
String subscription) {
if
(TopicName.get(topic.getName()).getDomain().equals(TopicDomain.persistent)) {
```
Here is the result.
```
% mvn test -pl pulsar-broker
-Dtest=org.apache.pulsar.client.api.KeySharedSubscriptionTest#testCheckBetweenSkippingAndRecentlyJoinedConsumers
...
[INFO] -------------------------------------------------------
[INFO] T E S T S
[INFO] -------------------------------------------------------
[INFO] Running org.apache.pulsar.client.api.KeySharedSubscriptionTest
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 8.545
s - in org.apache.pulsar.client.api.KeySharedSubscriptionTest
[INFO]
[INFO] Results:
[INFO]
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0
[INFO]
[INFO]
------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO]
------------------------------------------------------------------------
[INFO] Total time: 18.220 s
[INFO] Finished at: 2023-05-22T15:22:03+09:00
[INFO]
------------------------------------------------------------------------
% grep 'DEBUG'
pulsar-broker/target/surefire-reports/org.apache.pulsar.client.api.KeySharedSubscriptionTest-output.txt
2023-05-22T16:20:00,748 - ERROR -
[TestNG-method=testCheckBetweenSkippingAndRecentlyJoinedConsumers-1:KeySharedSubscriptionTest@1243]
- DEBUG: readPosition: 3:4
2023-05-22T16:20:00,748 - ERROR -
[TestNG-method=testCheckBetweenSkippingAndRecentlyJoinedConsumers-1:KeySharedSubscriptionTest@1256]
- DEBUG: replayMsgId: 3:2
2023-05-22T16:20:00,766 - ERROR -
[TestNG-method=testCheckBetweenSkippingAndRecentlyJoinedConsumers-1:KeySharedSubscriptionTest@1262]
- DEBUG: recentlyJoinedConsumers: {c3=3:4}
2023-05-22T16:20:00,770 - ERROR -
[TestNG-method=testCheckBetweenSkippingAndRecentlyJoinedConsumers-1:KeySharedSubscriptionTest@1279]
- DEBUG: c3Msgs messageId: [3:2:-1, 3:0:-1]
2023-05-22T16:20:00,774 - ERROR -
[TestNG-method=testCheckBetweenSkippingAndRecentlyJoinedConsumers-1:KeySharedSubscriptionTest@1280]
- DEBUG: readPosition: 3:4
2023-05-22T16:20:00,782 - ERROR -
[TestNG-method=testCheckBetweenSkippingAndRecentlyJoinedConsumers-1:KeySharedSubscriptionTest@1281]
- DEBUG: recentlyJoinedConsumers: {c3=3:4}
```
### How to fix
I considered whether this issue is fixed in the proposed approaches.
* your proposed approach
https://github.com/apache/pulsar/pull/20179#discussion_r1198087967
- Conclusion: It doesn't fix this issue.
- Reason:
The 1st position is the read position. If this position is used as
recently joined consumers, it causes an out of order.
The 2nd position is the head of the normal reading entries. As we
checked in before, we can send replay messages (e.g. m3) immediately when the
consumer is connected. Therefore, this position could be null in this case.
* last sent position and individually sent positions approach
https://github.com/apache/pulsar/pull/20179#issuecomment-1539314184
- Conclusion: It will fix this issue.
- Reason:
This feature guarantees all messages less than or equal to the last sent
position are already scheduled to be sent. Therefore, skipped messages (e.g.
m3) are greater than the last sent position.
* last sent positions approach https://github.com/apache/pulsar/pull/20179
- Conclusion: It will fix this issue.
- Reason:
Same as above. I tested as follows.
```
% mvn test -pl
pulsar-common,pulsar-client-admin-api,managed-ledger,pulsar-broker
-Dtest=org.apache.pulsar.client.api.KeySharedSubscriptionTest#testCheckBetweenSkippingAndRecentlyJoinedConsumers
-DfailIfNoTests=false
...
[INFO] -------------------------------------------------------
[INFO] T E S T S
[INFO] -------------------------------------------------------
[INFO] Running org.apache.pulsar.client.api.KeySharedSubscriptionTest
[ERROR] Tests run: 2, Failures: 1, Errors: 0, Skipped: 1, Time elapsed:
14.301 s <<< FAILURE! - in
org.apache.pulsar.client.api.KeySharedSubscriptionTest
[ERROR]
testCheckBetweenSkippingAndRecentlyJoinedConsumers(org.apache.pulsar.client.api.KeySharedSubscriptionTest)
Time elapsed: 0.914 s <<< FAILURE!
java.lang.AssertionError: expected object to not be null
at org.testng.Assert.fail(Assert.java:110)
at org.testng.Assert.assertNotNull(Assert.java:1319)
at org.testng.Assert.assertNotNull(Assert.java:1303)
at
org.apache.pulsar.client.api.KeySharedSubscriptionTest.testCheckBetweenSkippingAndRecentlyJoinedConsumers(KeySharedSubscriptionTest.java:1443)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at
org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:139)
at
org.testng.internal.invokers.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:47)
at
org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:76)
at
org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:11)
at
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
[INFO]
[INFO] Results:
[INFO]
[ERROR] Failures:
[ERROR]
org.apache.pulsar.client.api.KeySharedSubscriptionTest.testCheckBetweenSkippingAndRecentlyJoinedConsumers(org.apache.pulsar.client.api.KeySharedSubscriptionTest)
[INFO] Run 1: PASS
[ERROR] Run 2:
KeySharedSubscriptionTest.testCheckBetweenSkippingAndRecentlyJoinedConsumers:1443
expected object to not be null
[INFO]
[INFO]
[ERROR] Tests run: 1, Failures: 1, Errors: 0, Skipped: 0
[INFO]
[INFO]
------------------------------------------------------------------------
[INFO] Reactor Summary for Pulsar Client Admin :: API 3.0.0-SNAPSHOT:
[INFO]
[INFO] Pulsar Client Admin :: API ......................... SUCCESS [
1.076 s]
[INFO] Pulsar Common ...................................... SUCCESS [
3.927 s]
[INFO] Managed Ledger ..................................... SUCCESS [
1.756 s]
[INFO] Pulsar Broker ...................................... FAILURE [
23.586 s]
[INFO]
------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO]
------------------------------------------------------------------------
[INFO] Total time: 31.095 s
[INFO] Finished at: 2023-05-22T17:45:15+09:00
[INFO]
------------------------------------------------------------------------
% grep 'DEBUG'
pulsar-broker/target/surefire-reports/org.apache.pulsar.client.api.KeySharedSubscriptionTest-output.txt
2023-05-22T17:45:06,279 - ERROR -
[TestNG-method=testCheckBetweenSkippingAndRecentlyJoinedConsumers-1:KeySharedSubscriptionTest@1419]
- DEBUG: readPosition: 4:4
2023-05-22T17:45:06,280 - ERROR -
[TestNG-method=testCheckBetweenSkippingAndRecentlyJoinedConsumers-1:KeySharedSubscriptionTest@1432]
- DEBUG: replayMsgId: 4:2
2023-05-22T17:45:06,298 - ERROR -
[TestNG-method=testCheckBetweenSkippingAndRecentlyJoinedConsumers-1:KeySharedSubscriptionTest@1438]
- DEBUG: recentlyJoinedConsumers: {consumerName=c3, consumerId=2,
address=/127.0.0.1:65342=[4:0, 4:3]}
2023-05-22T17:45:11,361 - ERROR -
[TestNG-method=testCheckBetweenSkippingAndRecentlyJoinedConsumers-1:KeySharedSubscriptionTest@1419]
- DEBUG: readPosition: 4:4
2023-05-22T17:45:11,361 - ERROR -
[TestNG-method=testCheckBetweenSkippingAndRecentlyJoinedConsumers-1:KeySharedSubscriptionTest@1432]
- DEBUG: replayMsgId: 4:2
2023-05-22T17:45:11,377 - ERROR -
[TestNG-method=testCheckBetweenSkippingAndRecentlyJoinedConsumers-1:KeySharedSubscriptionTest@1438]
- DEBUG: recentlyJoinedConsumers: {consumerName=c3, consumerId=2,
address=/127.0.0.1:65353=[4:0, 4:3]}
````
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]