This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit a62d34abeb0975e66df28fb30ea59f45fd170883 Author: Aloys <[email protected]> AuthorDate: Tue Feb 9 16:18:44 2021 +0800 makes subscription start from MessageId.latest defaultly (#9444) ### Motivation Currently , if we did not specify the position when create a new subscription, it's will use the earliest position defaulty. https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java#L2002 This PR makes subscription start from MessageId.latest defaultly (cherry picked from commit 28b20946107bd4389129e40c44da1e99b37e65a5) --- .../broker/admin/impl/PersistentTopicsBase.java | 2 +- .../pulsar/broker/admin/PersistentTopicsTest.java | 67 ++++++++++++++++++++++ 2 files changed, 68 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 16a91f3..d957797 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1894,7 +1894,7 @@ public class PersistentTopicsBase extends AdminResource { return; } } - final MessageIdImpl targetMessageId = messageId == null ? (MessageIdImpl) MessageId.earliest : messageId; + final MessageIdImpl targetMessageId = messageId == null ? (MessageIdImpl) MessageId.latest : messageId; log.info("[{}][{}] Creating subscription {} at message id {}", clientAppId(), topicName, subscriptionName, targetMessageId); // If the topic name is a partition name, no need to get partition topic metadata again diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index 0c787ee..c31b51a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -34,9 +34,11 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; @@ -44,6 +46,7 @@ import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.zookeeper.ZooKeeperManagedLedgerCache; import org.apache.zookeeper.KeeperException; import org.mockito.ArgumentCaptor; @@ -208,6 +211,70 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest { Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); } + + + @Test + public void testCreateSubscriptions() throws Exception{ + final int numberOfMessages = 5; + final String SUB_EARLIEST = "sub-earliest"; + final String SUB_LATEST = "sub-latest"; + final String SUB_NONE_MESSAGE_ID = "sub-none-message-id"; + + String testLocalTopicName = "subWithPositionOrNot"; + final String topicName = "persistent://" + testTenant + "/" + testNamespace + "/" + testLocalTopicName; + admin.topics().createNonPartitionedTopic(topicName); + + ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer().topic(topicName) + .maxPendingMessages(30000).create(); + + // 1) produce numberOfMessages message to pulsar + for (int i = 0; i < numberOfMessages; i++) { + System.out.println(producer.send(new byte[10])); + } + + // 2) Create a subscription from earliest position + + AsyncResponse response = mock(AsyncResponse.class); + persistentTopics.createSubscription(response, testTenant, testNamespace, testLocalTopicName, SUB_EARLIEST, true, + (MessageIdImpl) MessageId.earliest, false); + ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class); + verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); + + TopicStats topicStats = persistentTopics.getStats(testTenant, testNamespace, testLocalTopicName, true, true); + long msgBacklog = topicStats.subscriptions.get(SUB_EARLIEST).msgBacklog; + System.out.println("Message back log for " + SUB_EARLIEST + " is :" + msgBacklog); + Assert.assertEquals(msgBacklog, numberOfMessages); + + // 3) Create a subscription with form latest position + + response = mock(AsyncResponse.class); + persistentTopics.createSubscription(response, testTenant, testNamespace, testLocalTopicName, SUB_LATEST, true, + (MessageIdImpl) MessageId.latest, false); + responseCaptor = ArgumentCaptor.forClass(Response.class); + verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); + topicStats = persistentTopics.getStats(testTenant, testNamespace, testLocalTopicName, true, true); + msgBacklog = topicStats.subscriptions.get(SUB_LATEST).msgBacklog; + System.out.println("Message back log for " + SUB_LATEST + " is :" + msgBacklog); + Assert.assertEquals(msgBacklog, 0); + + // 4) Create a subscription without position + + response = mock(AsyncResponse.class); + persistentTopics.createSubscription(response, testTenant, testNamespace, testLocalTopicName, SUB_NONE_MESSAGE_ID, true, + null, false); + responseCaptor = ArgumentCaptor.forClass(Response.class); + verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); + topicStats = persistentTopics.getStats(testTenant, testNamespace, testLocalTopicName, true, true); + msgBacklog = topicStats.subscriptions.get(SUB_NONE_MESSAGE_ID).msgBacklog; + System.out.println("Message back log for " + SUB_NONE_MESSAGE_ID + " is :" + msgBacklog); + Assert.assertEquals(msgBacklog, 0); + + producer.close(); + } + @Test public void testCreateSubscriptionForNonPersistentTopic() throws InterruptedException { doReturn(TopicDomain.non_persistent.value()).when(persistentTopics).domain();
