This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f493676 support GetLastMessageId api (#3196)
f493676 is described below
commit f4936763af1c1dab54d031ae355e40d66d3a602e
Author: legendtkl <[email protected]>
AuthorDate: Fri Dec 21 13:19:28 2018 +0800
support GetLastMessageId api (#3196)
issue ticket: https://github.com/apache/pulsar/issues/3162 (Fixes #3162)
the pr is straightforward, and it exposes the Persistent Topic
GetLastMessageId to Rest API:
/admin/v2/persistent/{tenant}/{namespace}/{topic}/lastMessageId
---
.../broker/admin/impl/PersistentTopicsBase.java | 17 +++
.../pulsar/broker/admin/v2/PersistentTopics.java | 14 ++
.../broker/admin/AdminApiGetLastMessageIdTest.java | 142 +++++++++++++++++++++
3 files changed, 173 insertions(+)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 1605d8c..de1af66 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1450,4 +1450,21 @@ public class PersistentTopicsBase extends AdminResource {
}
return;
}
+
+ protected MessageId internalGetLastMessageId(boolean authoritative) {
+ validateAdminOperationOnTopic(authoritative);
+
+ if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
+ log.error("[{}] Not supported operation of non-persistent topic
{}", clientAppId(), topicName);
+ throw new RestException(Status.METHOD_NOT_ALLOWED,
+ "GetLastMessageId on a non-persistent topic is not
allowed");
+ }
+ PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
+ Position position = topic.getLastMessageId();
+ int partitionIndex = TopicName.getPartitionIndex(topic.getName());
+
+ MessageId messageId = new
MessageIdImpl(((PositionImpl)position).getLedgerId(),
((PositionImpl)position).getEntryId(), partitionIndex);
+
+ return messageId;
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 437407b..350da17 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -498,4 +498,18 @@ public class PersistentTopics extends PersistentTopicsBase
{
validateTopicName(tenant, namespace, encodedTopic);
return internalOffloadStatus(authoritative);
}
+
+ @GET
+ @Path("/{tenant}/{namespace}/{topic}/lastMessageId")
+ @ApiOperation(value = "Return the last commit message id of topic")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 405, message = "Operation not allowed on
persistent topic"),
+ @ApiResponse(code = 404, message = "Topic does not exist")})
+ public MessageId getLastMessageId(@PathParam("tenant") String tenant,
+ @PathParam("namespace")
String namespace,
+ @PathParam("topic")
@Encoded String encodedTopic,
+
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+ validateTopicName(tenant, namespace, encodedTopic);
+ return internalGetLastMessageId(authoritative);
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java
new file mode 100644
index 0000000..cc5cf0e
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import com.google.common.collect.Sets;
+import org.apache.pulsar.broker.admin.v2.PersistentTopics;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
+import org.apache.pulsar.broker.web.PulsarWebResource;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import javax.ws.rs.core.UriInfo;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Future;
+
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+public class AdminApiGetLastMessageIdTest extends MockedPulsarServiceBaseTest {
+
+ private PersistentTopics persistentTopics;
+ private final String testTenant = "my-tenant";
+ private final String testLocalCluster = "use";
+ private final String testNamespace = "my-namespace";
+ protected Field uriField;
+ protected UriInfo uriInfo;
+
+ @BeforeClass
+ public void initPersistentTopics() throws Exception {
+ uriField = PulsarWebResource.class.getDeclaredField("uri");
+ uriField.setAccessible(true);
+ uriInfo = mock(UriInfo.class);
+ }
+
+ @Override
+ @BeforeMethod
+ protected void setup() throws Exception {
+ super.internalSetup();
+ admin.clusters().createCluster("test", new
ClusterData(brokerUrl.toString()));
+ admin.tenants().createTenant("prop",
+ new TenantInfo(Sets.newHashSet("appid1"),
Sets.newHashSet("test")));
+ admin.namespaces().createNamespace("prop/ns-abc");
+ admin.namespaces().setNamespaceReplicationClusters("prop/ns-abc",
Sets.newHashSet("test"));
+ persistentTopics = spy(new PersistentTopics());
+ persistentTopics.setServletContext(new MockServletContext());
+ persistentTopics.setPulsar(pulsar);
+
+ doReturn(mockZookKeeper).when(persistentTopics).globalZk();
+ doReturn(mockZookKeeper).when(persistentTopics).localZk();
+
doReturn(pulsar.getConfigurationCache().propertiesCache()).when(persistentTopics).tenantsCache();
+
doReturn(pulsar.getConfigurationCache().policiesCache()).when(persistentTopics).policiesCache();
+ doReturn(false).when(persistentTopics).isRequestHttps();
+ doReturn(null).when(persistentTopics).originalPrincipal();
+ doReturn("test").when(persistentTopics).clientAppId();
+ doReturn("persistent").when(persistentTopics).domain();
+
doNothing().when(persistentTopics).validateAdminAccessForTenant(this.testTenant);
+
doReturn(mock(AuthenticationDataHttps.class)).when(persistentTopics).clientAuthData();
+ }
+
+ @Override
+ @AfterMethod
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testGetLastMessageId() throws Exception {
+ try {
+ persistentTopics.getLastMessageId(testTenant, testNamespace,
"my-topic", true);
+ } catch (Exception e) {
+ //System.out.println(e.getMessage());
+ Assert.assertEquals("Topic not found", e.getMessage());
+ }
+
+ String key = "legendtkl";
+ final String topicName = "persistent://prop/ns-abc/my-topic";
+ final String messagePredicate = "my-message-" + key + "-";
+ final int numberOfMessages = 30;
+
+ // 2. Create Producer
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+
+ // 3. Publish message and get message id
+ for (int i = 0; i < numberOfMessages; i++) {
+ String message = messagePredicate + i;
+ producer.send(message.getBytes());
+ }
+
+ MessageId id = persistentTopics.getLastMessageId("prop", "ns-abc",
"my-topic", true);
+ System.out.println(id.toString());
+ Assert.assertTrue(((MessageIdImpl)id).getLedgerId() >= 0);
+ Assert.assertEquals(numberOfMessages-1,
((MessageIdImpl)id).getEntryId());
+
+ // send more numberOfMessages messages, the last message id should be
numberOfMessages*2-1
+ for (int i = 0; i < numberOfMessages; i++) {
+ String message = messagePredicate + i;
+ producer.send(message.getBytes());
+ }
+ id = persistentTopics.getLastMessageId("prop", "ns-abc", "my-topic",
true);
+ System.out.println(id.toString());
+ Assert.assertTrue(((MessageIdImpl)id).getLedgerId() > 0);
+ Assert.assertEquals( 2 * numberOfMessages -1,
((MessageIdImpl)id).getEntryId());
+
+ System.out.println(id.toString());
+ }
+}