equanz commented on code in PR #20179:
URL: https://github.com/apache/pulsar/pull/20179#discussion_r1201975192


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##########
@@ -65,23 +67,22 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
     private final KeySharedMode keySharedMode;
 
     /**
-     * When a consumer joins, it will be added to this map with the current 
read position.
-     * This means that, in order to preserve ordering, new consumers can only 
receive old
-     * messages, until the mark-delete position will move past this point.
+     * When a consumer joins, it will be added to this map with the current 
last sent position per the message key.
+     * This means that, in order to preserve ordering per the message key, new 
consumers can only receive old
+     * messages, until the mark-delete position will move past this point in 
the key. New consumers can receive
+     * any new messages with the message key that is not in the last sent 
position.
      */
-    private final LinkedHashMap<Consumer, PositionImpl> 
recentlyJoinedConsumers;
-
-    private final Set<Consumer> stuckConsumers;
-    private final Set<Consumer> nextStuckConsumers;
+    private final LinkedHashMap<Consumer, LastSentPositions> 
recentlyJoinedConsumers;
+    // The lastSentPosition is not thread-safe
+    private final LastSentPositions lastSentPositions;
 
     PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, 
ManagedCursor cursor,

Review Comment:
   @poorbarcode
   
   I understand.
   However, I considered your approach, and then I found another issue. This 
issue is not specific to the approach you have proposed. It just means the read 
position is not suitable for recently joined consumers.
   
   ### Details of the issue
   Consider the following flow.
   
   1. Assume that,
      readPosition: `1:1`
      messagesToRedeliver: []
      recentlyJoinedConsumers []
      c1: messagesForC: 1, pending: []
      c2: messagesForC: 1000, pending: []
      selector: key-a: c1, key-b: c2
   2. Dispatch m1 (key: `key-a`, position: `1:1`, type: Normal)
      readPosition: `1:2`
      messagesToRedeliver: []
      recentlyJoinedConsumers []
      c1: messagesForC: 0, pending: [m1]
      c2: messagesForC: 1000, pending: []
      selector: key-a: c1, key-b: c2
   3. Dispatch m2 (key: `key-b`, position: `1:2`, type: Normal)
      readPosition: `1:3`
      messagesToRedeliver: []
      recentlyJoinedConsumers []
      c1: messagesForC: 0, pending: [m1]
      c2: messagesForC: 999, pending: [m2]
      selector: key-a: c1, key-b: c2
   4. Dispatch m3 (key: `key-a`, position: `1:3`, type: Normal)
      readPosition: `1:4`
      messagesToRedeliver: [m3]
      recentlyJoinedConsumers []
      c1: messagesForC: 0, pending: [m1]
      c2: messagesForC: 999, pending: [m2]
      selector: key-a: c1, key-b: c2
   5. Dispatch m4 (key: `key-b`, position: `1:4`, type: Normal) because 
PersistentStickyKeyDispatcherMultipleConsumers has [the 
isDispatcherStuckOnReplays 
feature](https://github.com/apache/pulsar/blob/b7f0004313ea4565717cc6d3c0b99aee5c079c6c/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L429-L434).
      readPosition: `1:5`
      markDeletePosition: `1:0` (mock)
      messagesToRedeliver: [m3]
      recentlyJoinedConsumers []
      c1: messagesForC: 0, pending: [m1]
      c2: messagesForC: 998, pending: [m2, m4]
      selector: key-a: c1, key-b: c2
   6. Add consumer c3
      readPosition: `1:5`
      messagesToRedeliver: [m3]
      recentlyJoinedConsumers [c3: `1:5`]
      c1: messagesForC: 0, pending: [m1]
      c2: messagesForC: 998, pending: [m2, m4]
      c3: messagesForC: 1000, pending: []
      selector: key-a: c1, key-b: c2
   7. Dispatch m3 (key: `key-a`, position: `1:3`, type: Replay)
      readPosition: `1:5`
      messagesToRedeliver: []
      recentlyJoinedConsumers [c3: `1:5`]
      c1: messagesForC: 0, pending: [m1]
      c2: messagesForC: 998, pending: [m2, m4]
      c3: messagesForC: 999, pending: [m3]
      selector: key-a: c3, key-b: c2
   8. Disconnect c1 and redelivery m1
      readPosition: `1:5`
      messagesToRedeliver: []
      recentlyJoinedConsumers [c3: `1:5`]
      c2: messagesForC: 998, pending: [m2, m4]
      c3: messagesForC: 998, pending: [m3, m1] // out of order
      selector: key-a: c3, key-b: c2
   
   I tested this issue by following the test case.
   Base commit hash: 
https://github.com/apache/pulsar/commit/b7f0004313ea4565717cc6d3c0b99aee5c079c6c
   
   ```java
   diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
   index 18fb141be31..66652eec73e 100644
   --- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
   +++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
   @@ -20,12 +20,15 @@ package org.apache.pulsar.client.api;
    
    import static org.testng.Assert.assertEquals;
    import static org.testng.Assert.assertFalse;
   +import static org.testng.Assert.assertNotEquals;
    import static org.testng.Assert.assertNotNull;
    import static org.testng.Assert.assertNull;
    import static org.testng.Assert.assertTrue;
    import static org.testng.Assert.fail;
    import com.google.common.collect.Lists;
   +import com.google.common.collect.Sets;
    import java.io.IOException;
   +import java.lang.reflect.Field;
    import java.nio.file.Files;
    import java.nio.file.Paths;
    import java.util.ArrayList;
   @@ -35,6 +38,8 @@ import java.util.HashMap;
    import java.util.HashSet;
    import java.util.List;
    import java.util.Map;
   +import java.util.NavigableMap;
   +import java.util.NavigableSet;
    import java.util.Optional;
    import java.util.Random;
    import java.util.Set;
   @@ -52,13 +57,17 @@ import lombok.Cleanup;
    import org.apache.bookkeeper.mledger.impl.PositionImpl;
    import org.apache.pulsar.broker.service.Topic;
    import 
org.apache.pulsar.broker.service.nonpersistent.NonPersistentStickyKeyDispatcherMultipleConsumers;
   +import 
org.apache.pulsar.broker.service.persistent.MessageRedeliveryController;
   +import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
    import 
org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
    import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
    import org.apache.pulsar.client.impl.ConsumerImpl;
   +import org.apache.pulsar.client.impl.MessageIdImpl;
    import org.apache.pulsar.common.api.proto.KeySharedMode;
    import org.apache.pulsar.common.naming.TopicDomain;
    import org.apache.pulsar.common.naming.TopicName;
    import org.apache.pulsar.common.schema.KeyValue;
   +import org.apache.pulsar.common.util.Codec;
    import org.apache.pulsar.common.util.Murmur3_32Hash;
    import org.awaitility.Awaitility;
    import org.slf4j.Logger;
   @@ -1191,6 +1200,86 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
            l.await();
        }
    
   +    @Test(timeOut = 30_000)
   +    public void testCheckBetweenSkippingAndRecentlyJoinedConsumers() throws 
Exception {
   +        conf.setSubscriptionKeySharedUseConsistentHashing(true);
   +        conf.setSubscriptionKeySharedConsistentHashingReplicaPoints(100);
   +
   +        final String topicName = 
"persistent://public/default/recently-joined-consumers-" + UUID.randomUUID();
   +        final String subName = "my-sub";
   +
   +        final ConsumerBuilder<String> cb = 
pulsarClient.newConsumer(Schema.STRING)
   +                .topic(topicName)
   +                .subscriptionName(subName)
   +                .subscriptionType(SubscriptionType.Key_Shared);
   +
   +        // create 2 consumers
   +        @Cleanup
   +        final Consumer<String> c1 = 
cb.consumerName("c1").receiverQueueSize(1).subscribe();
   +        @Cleanup
   +        final Consumer<String> c2 = 
cb.consumerName("c2").receiverQueueSize(1000).subscribe();
   +
   +        @Cleanup
   +        final Producer<String> p = pulsarClient.newProducer(Schema.STRING)
   +                .topic(topicName)
   +                .create();
   +
   +        final Set<String> c1Keys = Set.of("1", "3", "4", "5", "9");
   +        final Set<String> c2Keys = Set.of("0", "2", "6", "7", "8");
   +
   +        // send and receive
   +        Awaitility.await().untilAsserted(() -> 
assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName).getConsumers().stream().filter(c
 -> 
c.getConsumerName().equals(c1.getConsumerName())).findFirst().get().getAvailablePermits(),
 1));
   +        p.newMessage().key(Integer.toString(1)).value("msg-0").send();
   +        Awaitility.await().untilAsserted(() -> 
assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName).getConsumers().stream().filter(c
 -> 
c.getConsumerName().equals(c1.getConsumerName())).findFirst().get().getAvailablePermits(),
 0));
   +
   +        Awaitility.await().untilAsserted(() -> 
assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName).getConsumers().stream().filter(c
 -> 
c.getConsumerName().equals(c2.getConsumerName())).findFirst().get().getAvailablePermits(),
 1000));
   +        p.newMessage().key(Integer.toString(0)).value("msg-1").send();
   +        Awaitility.await().untilAsserted(() -> 
assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName).getConsumers().stream().filter(c
 -> 
c.getConsumerName().equals(c2.getConsumerName())).findFirst().get().getAvailablePermits(),
 999));
   +
   +        final MessageIdImpl msg2Id = (MessageIdImpl) 
p.newMessage().key(Integer.toString(1)).value("msg-2").send();
   +        p.newMessage().key(Integer.toString(0)).value("msg-3").send();
   +        Awaitility.await().untilAsserted(() -> 
assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName).getConsumers().stream().filter(c
 -> 
c.getConsumerName().equals(c2.getConsumerName())).findFirst().get().getAvailablePermits(),
 998));
   +        Awaitility.await().untilAsserted(() -> 
assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName).getMsgBacklog(),
 4));
   +        log.error("DEBUG: readPosition: {}", 
admin.topics().getInternalStats(topicName).cursors.get(Codec.encode(subName)).readPosition);
   +
   +        final PersistentStickyKeyDispatcherMultipleConsumers dispatcher =
   +                (PersistentStickyKeyDispatcherMultipleConsumers) 
pulsar.getBrokerService().getTopicIfExists(topicName).get().get().getSubscription(subName).getDispatcher();
   +        final Field redeliveryMessagesField = 
PersistentDispatcherMultipleConsumers.class
   +                .getDeclaredField("redeliveryMessages");
   +        redeliveryMessagesField.setAccessible(true);
   +        final MessageRedeliveryController redeliveryMessages = 
(MessageRedeliveryController) redeliveryMessagesField.get(dispatcher);
   +
   +        final NavigableSet<PositionImpl> replayMsgSet = 
redeliveryMessages.getMessagesToReplayNow(4);
   +        assertEquals(replayMsgSet.size(), 1);
   +        final PositionImpl replayMsg = replayMsgSet.first();
   +        assertEquals(replayMsg.compareTo(msg2Id.getLedgerId(), 
msg2Id.getEntryId()), 0);
   +        log.error("DEBUG: replayMsgId: {}", replayMsg);
   +
   +        // add c3
   +        @Cleanup
   +        final Consumer<String> c3 = cb.consumerName("c3").subscribe();
   +        final List<Message<String>> c3Msgs = new ArrayList<>();
   +        log.error("DEBUG: recentlyJoinedConsumers: {}", 
admin.topics().getStats(topicName).getSubscriptions().get(subName).getConsumersAfterMarkDeletePosition());
   +
   +        final Set<String> c3Keys = Set.of("1", "2", "6", "7");
   +
   +        Message<String> msg3 = c3.receive(100, TimeUnit.MILLISECONDS);
   +        assertNotNull(msg3);
   +        c3Msgs.add(msg3);
   +
   +        // disconnect
   +        c1.close();
   +
   +        msg3 = c3.receive(100, TimeUnit.MILLISECONDS);
   +        assertNotNull(msg3);
   +        c3Msgs.add(msg3);
   +
   +        // check out-of-order
   +        
assertTrue(c3Msgs.get(0).getMessageId().compareTo(c3Msgs.get(1).getMessageId()) 
> 0);
   +        log.error("DEBUG: c3Msgs messageId: {}", 
c3Msgs.stream().map(Message::getMessageId).toList());
   +        log.error("DEBUG: readPosition: {}", 
admin.topics().getInternalStats(topicName).cursors.get(Codec.encode(subName)).readPosition);
   +        log.error("DEBUG: recentlyJoinedConsumers: {}", 
admin.topics().getStats(topicName).getSubscriptions().get(subName).getConsumersAfterMarkDeletePosition());
   +    }
    
        private KeySharedMode getKeySharedModeOfSubscription(Topic topic, 
String subscription) {
            if 
(TopicName.get(topic.getName()).getDomain().equals(TopicDomain.persistent)) {
   ```
   
   Here is the result.
   ```
   % mvn test -pl pulsar-broker 
-Dtest=org.apache.pulsar.client.api.KeySharedSubscriptionTest#testCheckBetweenSkippingAndRecentlyJoinedConsumers
   ...
   [INFO] -------------------------------------------------------
   [INFO]  T E S T S
   [INFO] -------------------------------------------------------
   [INFO] Running org.apache.pulsar.client.api.KeySharedSubscriptionTest
   [INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 8.545 
s - in org.apache.pulsar.client.api.KeySharedSubscriptionTest
   [INFO]
   [INFO] Results:
   [INFO]
   [INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0
   [INFO]
   [INFO] 
------------------------------------------------------------------------
   [INFO] BUILD SUCCESS
   [INFO] 
------------------------------------------------------------------------
   [INFO] Total time:  18.220 s
   [INFO] Finished at: 2023-05-22T15:22:03+09:00
   [INFO] 
------------------------------------------------------------------------
   
   % grep 'DEBUG' 
pulsar-broker/target/surefire-reports/org.apache.pulsar.client.api.KeySharedSubscriptionTest-output.txt
   2023-05-22T16:20:00,748 - ERROR - 
[TestNG-method=testCheckBetweenSkippingAndRecentlyJoinedConsumers-1:KeySharedSubscriptionTest@1243]
 - DEBUG: readPosition: 3:4
   2023-05-22T16:20:00,748 - ERROR - 
[TestNG-method=testCheckBetweenSkippingAndRecentlyJoinedConsumers-1:KeySharedSubscriptionTest@1256]
 - DEBUG: replayMsgId: 3:2
   2023-05-22T16:20:00,766 - ERROR - 
[TestNG-method=testCheckBetweenSkippingAndRecentlyJoinedConsumers-1:KeySharedSubscriptionTest@1262]
 - DEBUG: recentlyJoinedConsumers: {c3=3:4}
   2023-05-22T16:20:00,770 - ERROR - 
[TestNG-method=testCheckBetweenSkippingAndRecentlyJoinedConsumers-1:KeySharedSubscriptionTest@1279]
 - DEBUG: c3Msgs messageId: [3:2:-1, 3:0:-1]
   2023-05-22T16:20:00,774 - ERROR - 
[TestNG-method=testCheckBetweenSkippingAndRecentlyJoinedConsumers-1:KeySharedSubscriptionTest@1280]
 - DEBUG: readPosition: 3:4
   2023-05-22T16:20:00,782 - ERROR - 
[TestNG-method=testCheckBetweenSkippingAndRecentlyJoinedConsumers-1:KeySharedSubscriptionTest@1281]
 - DEBUG: recentlyJoinedConsumers: {c3=3:4}
   ```
   
   ### How to fix
   I considered whether this issue is fixed in the proposed approaches.
   
   * your proposed approach 
https://github.com/apache/pulsar/pull/20179#discussion_r1198087967
     - Conclusion: It doesn't fix this issue.
     - Reason:
       The 1st position is the read position. If this position is used as 
recently joined consumers, it causes an out of order.
       The 2nd position is the head of the normal reading entries. As we 
checked in before, we can send replay messages (e.g. m3) immediately when the 
consumer is connected. Therefore, this position could be null in this case.
   * last sent position and individually sent positions approach 
https://github.com/apache/pulsar/pull/20179#issuecomment-1539314184
     - Conclusion: It will fix this issue.
     - Reason:
       This feature guarantees all messages less than or equal to the last sent 
position are already scheduled to be sent. Therefore, skipped messages (e.g. 
m3) are greater than the last sent position.
   * last sent positions approach https://github.com/apache/pulsar/pull/20179
     - Conclusion: It will fix this issue.
     - Reason:
       Same as above. I tested as follows.
       ```
       % mvn test -pl 
pulsar-common,pulsar-client-admin-api,managed-ledger,pulsar-broker 
-Dtest=org.apache.pulsar.client.api.KeySharedSubscriptionTest#testCheckBetweenSkippingAndRecentlyJoinedConsumers
 -DfailIfNoTests=false
       ...
       [INFO] -------------------------------------------------------
       [INFO]  T E S T S
       [INFO] -------------------------------------------------------
       [INFO] Running org.apache.pulsar.client.api.KeySharedSubscriptionTest
       [ERROR] Tests run: 2, Failures: 1, Errors: 0, Skipped: 1, Time elapsed: 
14.301 s <<< FAILURE! - in 
org.apache.pulsar.client.api.KeySharedSubscriptionTest
       [ERROR] 
testCheckBetweenSkippingAndRecentlyJoinedConsumers(org.apache.pulsar.client.api.KeySharedSubscriptionTest)
  Time elapsed: 0.914 s  <<< FAILURE!
       java.lang.AssertionError: expected object to not be null
               at org.testng.Assert.fail(Assert.java:110)
               at org.testng.Assert.assertNotNull(Assert.java:1319)
               at org.testng.Assert.assertNotNull(Assert.java:1303)
               at 
org.apache.pulsar.client.api.KeySharedSubscriptionTest.testCheckBetweenSkippingAndRecentlyJoinedConsumers(KeySharedSubscriptionTest.java:1443)
               at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
               at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
               at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
               at java.base/java.lang.reflect.Method.invoke(Method.java:568)
               at 
org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:139)
               at 
org.testng.internal.invokers.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:47)
               at 
org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:76)
               at 
org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:11)
               at 
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
               at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
               at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
               at java.base/java.lang.Thread.run(Thread.java:833)
       
       [INFO]
       [INFO] Results:
       [INFO]
       [ERROR] Failures:
       [ERROR] 
org.apache.pulsar.client.api.KeySharedSubscriptionTest.testCheckBetweenSkippingAndRecentlyJoinedConsumers(org.apache.pulsar.client.api.KeySharedSubscriptionTest)
       [INFO]   Run 1: PASS
       [ERROR]   Run 2: 
KeySharedSubscriptionTest.testCheckBetweenSkippingAndRecentlyJoinedConsumers:1443
 expected object to not be null
       [INFO]
       [INFO]
       [ERROR] Tests run: 1, Failures: 1, Errors: 0, Skipped: 0
       [INFO]
       [INFO] 
------------------------------------------------------------------------
       [INFO] Reactor Summary for Pulsar Client Admin :: API 3.0.0-SNAPSHOT:
       [INFO]
       [INFO] Pulsar Client Admin :: API ......................... SUCCESS [  
1.076 s]
       [INFO] Pulsar Common ...................................... SUCCESS [  
3.927 s]
       [INFO] Managed Ledger ..................................... SUCCESS [  
1.756 s]
       [INFO] Pulsar Broker ...................................... FAILURE [ 
23.586 s]
       [INFO] 
------------------------------------------------------------------------
       [INFO] BUILD FAILURE
       [INFO] 
------------------------------------------------------------------------
       [INFO] Total time:  31.095 s
       [INFO] Finished at: 2023-05-22T17:45:15+09:00
       [INFO] 
------------------------------------------------------------------------
       
       % grep 'DEBUG' 
pulsar-broker/target/surefire-reports/org.apache.pulsar.client.api.KeySharedSubscriptionTest-output.txt
       2023-05-22T17:45:06,279 - ERROR - 
[TestNG-method=testCheckBetweenSkippingAndRecentlyJoinedConsumers-1:KeySharedSubscriptionTest@1419]
 - DEBUG: readPosition: 4:4
       2023-05-22T17:45:06,280 - ERROR - 
[TestNG-method=testCheckBetweenSkippingAndRecentlyJoinedConsumers-1:KeySharedSubscriptionTest@1432]
 - DEBUG: replayMsgId: 4:2
       2023-05-22T17:45:06,298 - ERROR - 
[TestNG-method=testCheckBetweenSkippingAndRecentlyJoinedConsumers-1:KeySharedSubscriptionTest@1438]
 - DEBUG: recentlyJoinedConsumers: {consumerName=c3, consumerId=2, 
address=/127.0.0.1:65342=[4:0, 4:3]}
       2023-05-22T17:45:11,361 - ERROR - 
[TestNG-method=testCheckBetweenSkippingAndRecentlyJoinedConsumers-1:KeySharedSubscriptionTest@1419]
 - DEBUG: readPosition: 4:4
       2023-05-22T17:45:11,361 - ERROR - 
[TestNG-method=testCheckBetweenSkippingAndRecentlyJoinedConsumers-1:KeySharedSubscriptionTest@1432]
 - DEBUG: replayMsgId: 4:2
       2023-05-22T17:45:11,377 - ERROR - 
[TestNG-method=testCheckBetweenSkippingAndRecentlyJoinedConsumers-1:KeySharedSubscriptionTest@1438]
 - DEBUG: recentlyJoinedConsumers: {consumerName=c3, consumerId=2, 
address=/127.0.0.1:65353=[4:0, 4:3]}
       ````
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to