sijie closed pull request #1440: Update default values for a few publisher settings URL: https://github.com/apache/incubator-pulsar/pull/1440
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index d3d15d926..ff85e3436 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -28,6 +28,7 @@ import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException; +import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.client.impl.ProducerImpl; @@ -74,7 +75,10 @@ public AbstractReplicator(String topicName, String replicatorPrefix, String loca this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize(); this.producerBuilder = client.newProducer() // - .topic(topicName).sendTimeout(0, TimeUnit.SECONDS) // + .topic(topicName) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .enableBatching(false) + .sendTimeout(0, TimeUnit.SECONDS) // .maxPendingMessages(producerQueueSize) // .producerName(getReplicatorName(replicatorPrefix, localCluster)); STATE_UPDATER.set(this, State.Stopped); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 14fc41b56..2524fa2ce 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -640,7 +640,11 @@ public void namespaces() throws PulsarAdminException, PulsarServerException, Exc assertEquals(admin.namespaces().getPersistence("prop-xyz/ns1"), new PersistencePolicies(3, 2, 1, 10.0)); // Force topic creation and namespace being loaded - Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://prop-xyz/ns1/my-topic").create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic("persistent://prop-xyz/ns1/my-topic") + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); producer.close(); admin.persistentTopics().delete("persistent://prop-xyz/ns1/my-topic"); @@ -804,8 +808,11 @@ public void partitionedTopics(String topicName) throws Exception { admin.persistentTopics().deleteSubscription(partitionedTopicName, "my-sub-1"); assertEquals(admin.persistentTopics().getSubscriptions(partitionedTopicName), Lists.newArrayList("my-sub")); - Producer<byte[]> producer = client.newProducer().topic(partitionedTopicName) - .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); + Producer<byte[]> producer = client.newProducer() + .topic(partitionedTopicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.RoundRobinPartition) + .create(); for (int i = 0; i < 10; i++) { String message = "message-" + i; @@ -859,7 +866,11 @@ public void partitionedTopics(String topicName) throws Exception { } catch (ConflictException ce) { } - producer = client.newProducer().topic(partitionedTopicName).create(); + producer = client.newProducer() + .topic(partitionedTopicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); topics = admin.persistentTopics().getList("prop-xyz/ns1"); assertEquals(topics.size(), 4); @@ -918,7 +929,11 @@ public void testNamespaceSplitBundle() throws Exception { // Force to create a topic final String namespace = "prop-xyz/ns1"; final String topicName = (new StringBuilder("persistent://")).append(namespace).append("/ds2").toString(); - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); producer.send("message".getBytes()); publishMessagesOnPersistentTopic(topicName, 0); assertEquals(admin.persistentTopics().getList(namespace), Lists.newArrayList(topicName)); @@ -944,7 +959,11 @@ public void testNamespaceSplitBundleConcurrent() throws Exception { // Force to create a topic final String namespace = "prop-xyz/ns1"; final String topicName = (new StringBuilder("persistent://")).append(namespace).append("/ds2").toString(); - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); producer.send("message".getBytes()); publishMessagesOnPersistentTopic(topicName, 0); assertEquals(admin.persistentTopics().getList(namespace), Lists.newArrayList(topicName)); @@ -1037,7 +1056,11 @@ public void testNamespaceUnloadBundle() throws Exception { Lists.newArrayList("my-sub")); // Create producer - Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://prop-xyz/ns1/ds2").create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic("persistent://prop-xyz/ns1/ds2") + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); for (int i = 0; i < 10; i++) { String message = "message-" + i; producer.send(message.getBytes()); @@ -1095,7 +1118,11 @@ public void testNamespaceBundleUnload(Integer numBundles) throws Exception { Lists.newArrayList("my-sub")); // Create producer - Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://prop-xyz/ns1-bundles/ds2").create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic("persistent://prop-xyz/ns1-bundles/ds2") + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); for (int i = 0; i < 10; i++) { String message = "message-" + i; producer.send(message.getBytes()); @@ -1148,7 +1175,11 @@ public void testClearBacklogOnNamespace(Integer numBundles) throws Exception { .subscribe(); // Create producer - Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://prop-xyz/ns1-bundles/ds2").create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic("persistent://prop-xyz/ns1-bundles/ds2") + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); for (int i = 0; i < 10; i++) { String message = "message-" + i; producer.send(message.getBytes()); @@ -1157,7 +1188,11 @@ public void testClearBacklogOnNamespace(Integer numBundles) throws Exception { producer.close(); // Create producer - Producer<byte[]> producer1 = pulsarClient.newProducer().topic("persistent://prop-xyz/ns1-bundles/ds1").create(); + Producer<byte[]> producer1 = pulsarClient.newProducer() + .topic("persistent://prop-xyz/ns1-bundles/ds1") + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); for (int i = 0; i < 10; i++) { String message = "message-" + i; producer1.send(message.getBytes()); @@ -1251,7 +1286,11 @@ private void publishMessagesOnPersistentTopic(String topicName, int messages) th } private void publishMessagesOnPersistentTopic(String topicName, int messages, int startIdx) throws Exception { - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); for (int i = startIdx; i < (messages + startIdx); i++) { String message = "message-" + i; @@ -1296,7 +1335,11 @@ public void statsOnNonExistingTopics() throws Exception { @Test public void testDeleteFailedReturnCode() throws Exception { String topicName = "persistent://prop-xyz/ns1/my-topic"; - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); try { admin.persistentTopics().delete(topicName); @@ -1684,8 +1727,11 @@ public void testPersistentTopicExpireMessageOnParitionTopic() throws Exception { Consumer<byte[]> consumer = client.newConsumer().topic("persistent://prop-xyz/ns1/ds1") .subscriptionName("my-sub").subscribe(); - Producer<byte[]> producer = client.newProducer().topic("persistent://prop-xyz/ns1/ds1") - .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); + Producer<byte[]> producer = client.newProducer() + .topic("persistent://prop-xyz/ns1/ds1") + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.RoundRobinPartition) + .create(); for (int i = 0; i < 10; i++) { String message = "message-" + i; producer.send(message.getBytes()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java index c53396a39..ef23f1973 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java @@ -173,8 +173,11 @@ public void testIncrementPartitionsOfTopic() throws Exception { .toString(); // (3) produce messages to all partitions including newly created partitions (RoundRobin) - Producer<byte[]> producer = client.newProducer().topic(partitionedTopicName) - .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); + Producer<byte[]> producer = client.newProducer() + .topic(partitionedTopicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.RoundRobinPartition) + .create(); final int totalMessages = newPartitions * 2; for (int i = 0; i < totalMessages; i++) { String message = "message-" + i; @@ -268,7 +271,11 @@ public void nonPersistentTopics() throws Exception { } private void publishMessagesOnTopic(String topicName, int messages, int startIdx) throws Exception { - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); for (int i = startIdx; i < (messages + startIdx); i++) { String message = "message-" + i; @@ -331,7 +338,11 @@ public void testUpdatePersistencePolicyUpdateManagedCursor() throws Exception { admin.namespaces().setPersistence(namespace, new PersistencePolicies(3, 3, 3, 50.0)); assertEquals(admin.namespaces().getPersistence(namespace), new PersistencePolicies(3, 3, 3, 50.0)); - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe(); PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get(); @@ -381,7 +392,7 @@ public void testUnloadTopic(final String topicType) throws Exception { assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent()); // recreation of producer will load the topic again - producer = pulsarClient.newProducer().topic(topicName).create(); + pulsarClient.newProducer().topic(topicName).create(); topic = pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topic); // unload the topic @@ -502,7 +513,11 @@ public void testResetCursorOnPosition(String namespaceName) throws Exception { } private void publishMessagesOnPersistentTopic(String topicName, int messages, int startIdx) throws Exception { - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); for (int i = startIdx; i < (messages + startIdx); i++) { String message = "message-" + i; @@ -741,7 +756,12 @@ public void testPublishConsumerStats() throws Exception { PulsarClient client = PulsarClient.builder().serviceUrl(pulsarUrl.toString()).build(); Consumer<byte[]> consumer = client.newConsumer().topic(topic).subscriptionName(subscriberName) .subscriptionType(SubscriptionType.Shared).subscribe(); - Producer<byte[]> producer = client.newProducer().topic(topic).producerName(producerName).create(); + Producer<byte[]> producer = client.newProducer() + .topic(topic) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .producerName(producerName) + .create(); retryStrategically((test) -> { PersistentTopicStats stats; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java index 7428afd87..f3762ac9d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java @@ -805,8 +805,11 @@ public void partitionedTopics(String topicName) throws Exception { admin.persistentTopics().deleteSubscription(partitionedTopicName, "my-sub-1"); assertEquals(admin.persistentTopics().getSubscriptions(partitionedTopicName), Lists.newArrayList("my-sub")); - Producer<byte[]> producer = client.newProducer().topic(partitionedTopicName) - .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); + Producer<byte[]> producer = client.newProducer() + .topic(partitionedTopicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.RoundRobinPartition) + .create(); for (int i = 0; i < 10; i++) { String message = "message-" + i; @@ -860,7 +863,11 @@ public void partitionedTopics(String topicName) throws Exception { } catch (ConflictException ce) { } - producer = client.newProducer().topic(partitionedTopicName).create(); + producer = client.newProducer() + .topic(partitionedTopicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); topics = admin.persistentTopics().getList("prop-xyz/use/ns1"); assertEquals(topics.size(), 4); @@ -918,7 +925,11 @@ public void testNamespaceSplitBundle() throws Exception { // Force to create a topic final String namespace = "prop-xyz/use/ns1"; final String topicName = (new StringBuilder("persistent://")).append(namespace).append("/ds2").toString(); - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); producer.send("message".getBytes()); publishMessagesOnPersistentTopic(topicName, 0); assertEquals(admin.persistentTopics().getList(namespace), Lists.newArrayList(topicName)); @@ -944,7 +955,11 @@ public void testNamespaceSplitBundleConcurrent() throws Exception { // Force to create a topic final String namespace = "prop-xyz/use/ns1"; final String topicName = (new StringBuilder("persistent://")).append(namespace).append("/ds2").toString(); - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); producer.send("message".getBytes()); publishMessagesOnPersistentTopic(topicName, 0); assertEquals(admin.persistentTopics().getList(namespace), Lists.newArrayList(topicName)); @@ -1064,7 +1079,11 @@ public void testNamespaceUnloadBundle() throws Exception { Lists.newArrayList("my-sub")); // Create producer - Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://prop-xyz/use/ns1/ds2").create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic("persistent://prop-xyz/use/ns1/ds2") + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); for (int i = 0; i < 10; i++) { String message = "message-" + i; producer.send(message.getBytes()); @@ -1121,8 +1140,11 @@ public void testNamespaceBundleUnload(Integer numBundles) throws Exception { Lists.newArrayList("my-sub")); // Create producer - Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://prop-xyz/use/ns1-bundles/ds2") - .create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic("persistent://prop-xyz/use/ns1-bundles/ds2") + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); for (int i = 0; i < 10; i++) { String message = "message-" + i; producer.send(message.getBytes()); @@ -1174,8 +1196,11 @@ public void testClearBacklogOnNamespace(Integer numBundles) throws Exception { .subscribe(); // Create producer - Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://prop-xyz/use/ns1-bundles/ds2") - .create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic("persistent://prop-xyz/use/ns1-bundles/ds2") + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); for (int i = 0; i < 10; i++) { String message = "message-" + i; producer.send(message.getBytes()); @@ -1184,8 +1209,11 @@ public void testClearBacklogOnNamespace(Integer numBundles) throws Exception { producer.close(); // Create producer - Producer<byte[]> producer1 = pulsarClient.newProducer().topic("persistent://prop-xyz/use/ns1-bundles/ds1") - .create(); + Producer<byte[]> producer1 = pulsarClient.newProducer() + .topic("persistent://prop-xyz/use/ns1-bundles/ds1") + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); for (int i = 0; i < 10; i++) { String message = "message-" + i; producer1.send(message.getBytes()); @@ -1278,7 +1306,11 @@ private void publishMessagesOnPersistentTopic(String topicName, int messages) th } private void publishMessagesOnPersistentTopic(String topicName, int messages, int startIdx) throws Exception { - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); for (int i = startIdx; i < (messages + startIdx); i++) { String message = "message-" + i; @@ -1323,7 +1355,11 @@ public void statsOnNonExistingTopics() throws Exception { @Test public void testDeleteFailedReturnCode() throws Exception { String topicName = "persistent://prop-xyz/use/ns1/my-topic"; - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); try { admin.persistentTopics().delete(topicName); @@ -1711,8 +1747,11 @@ public void testPersistentTopicExpireMessageOnParitionTopic() throws Exception { Consumer<byte[]> consumer = client.newConsumer().topic("persistent://prop-xyz/use/ns1/ds1") .subscriptionName("my-sub").subscribe(); - Producer<byte[]> producer = client.newProducer().topic("persistent://prop-xyz/use/ns1/ds1") - .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); + Producer<byte[]> producer = client.newProducer() + .topic("persistent://prop-xyz/use/ns1/ds1") + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.RoundRobinPartition) + .create(); for (int i = 0; i < 10; i++) { String message = "message-" + i; producer.send(message.getBytes()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java index 55b2b7b0f..781a932a4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java @@ -167,7 +167,9 @@ public void testIncrementPartitionsOfTopic() throws Exception { // (3) produce messages to all partitions including newly created partitions (RoundRobin) Producer<byte[]> producer = client.newProducer().topic(partitionedTopicName) - .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.RoundRobinPartition) + .create(); final int totalMessages = newPartitions * 2; for (int i = 0; i < totalMessages; i++) { String message = "message-" + i; @@ -261,7 +263,11 @@ public void nonPersistentTopics() throws Exception { } private void publishMessagesOnTopic(String topicName, int messages, int startIdx) throws Exception { - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); for (int i = startIdx; i < (messages + startIdx); i++) { String message = "message-" + i; @@ -324,7 +330,11 @@ public void testUpdatePersistencePolicyUpdateManagedCursor() throws Exception { admin.namespaces().setPersistence(namespace, new PersistencePolicies(3, 3, 3, 50.0)); assertEquals(admin.namespaces().getPersistence(namespace), new PersistencePolicies(3, 3, 3, 50.0)); - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe(); PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get(); @@ -495,7 +505,11 @@ public void testResetCursorOnPosition(String namespaceName) throws Exception { } private void publishMessagesOnPersistentTopic(String topicName, int messages, int startIdx) throws Exception { - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); for (int i = startIdx; i < (messages + startIdx); i++) { String message = "message-" + i; @@ -708,7 +722,11 @@ public void testNonPersistentTopics() throws Exception { Set<String> topicNames = Sets.newHashSet(); for (int i = 0; i < totalTopics; i++) { topicNames.add(topicName + i); - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName + i).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName + i) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); producer.close(); } @@ -735,7 +753,12 @@ public void testPublishConsumerStats() throws Exception { PulsarClient client = PulsarClient.builder().serviceUrl(pulsarUrl.toString()).build(); Consumer<byte[]> consumer = client.newConsumer().topic(topic).subscriptionName(subscriberName) .subscriptionType(SubscriptionType.Shared).subscribe(); - Producer<byte[]> producer = client.newProducer().topic(topic).producerName(producerName).create(); + Producer<byte[]> producer = client.newProducer() + .topic(topic) + .producerName(producerName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); retryStrategically((test) -> { PersistentTopicStats stats; @@ -777,8 +800,11 @@ public void testTenantNameWithUnderscore() throws Exception { String topic = "persistent://prop_xyz/use/my-namespace/my-topic"; - Producer<byte[]> producer = pulsarClient.newProducer().topic(topic) - .create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topic) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); PersistentTopicStats stats = admin.persistentTopics().getStats(topic); assertEquals(stats.publishers.size(), 1); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java index 9f9ab3549..87187c648 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java @@ -43,6 +43,7 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageBuilder; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.util.FutureUtil; @@ -530,10 +531,16 @@ public void testBatchAndNonBatchCumulativeAcks() throws Exception { consumer.close(); Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) - .batchingMaxPublishDelay(5, TimeUnit.SECONDS).batchingMaxMessages(numMsgsInBatch).enableBatching(true) - .create(); + .batchingMaxPublishDelay(5, TimeUnit.SECONDS) + .batchingMaxMessages(numMsgsInBatch) + .enableBatching(true) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); // create producer to publish non batch messages - Producer<byte[]> noBatchProducer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> noBatchProducer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); List<CompletableFuture<MessageId>> sendFutureList = Lists.newArrayList(); for (int i = 0; i < numMsgs / 2; i++) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java index 53c49d7bf..bc5c210b2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java @@ -159,7 +159,10 @@ public void testSimpleConsumerEventsWithoutPartition() throws Exception { assertEquals(subRef.getDispatcher().getType(), SubType.Failover); List<CompletableFuture<MessageId>> futures = Lists.newArrayListWithCapacity(numMsgs); - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); for (int i = 0; i < numMsgs; i++) { String message = "my-message-" + i; futures.add(producer.sendAsync(message.getBytes())); @@ -354,7 +357,8 @@ public void testSimpleConsumerEventsWithPartition() throws Exception { List<CompletableFuture<MessageId>> futures = Lists.newArrayListWithCapacity(numMsgs); Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) - .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); for (int i = 0; i < numMsgs; i++) { String message = "my-message-" + i; futures.add(producer.sendAsync(message.getBytes())); @@ -545,7 +549,10 @@ public void testActiveConsumerFailoverWithDelay() throws Exception { // enqueue messages List<CompletableFuture<MessageId>> futures = Lists.newArrayListWithCapacity(numMsgs); - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); for (int i = 0; i < numMsgs; i++) { String message = "my-message-" + i; futures.add(producer.sendAsync(message.getBytes())); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java index 2bc152a2d..4b659f45d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java @@ -39,6 +39,7 @@ import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; @@ -99,7 +100,10 @@ public void testSimpleConsumerEvents() throws Exception { assertEquals(subRef.getDispatcher().getType(), SubType.Shared); List<CompletableFuture<MessageId>> futures = Lists.newArrayListWithCapacity(numMsgs * 2); - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); for (int i = 0; i < numMsgs * 2; i++) { String message = "my-message-" + i; futures.add(producer.sendAsync(message.getBytes())); @@ -256,8 +260,11 @@ public void testConsumersWithDifferentPermits() throws Exception { }).subscribe(); List<CompletableFuture<MessageId>> futures = Lists.newArrayListWithCapacity(numMsgs); - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).maxPendingMessages(numMsgs + 1) - .create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .maxPendingMessages(numMsgs + 1) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); for (int i = 0; i < numMsgs; i++) { String message = "msg-" + i; futures.add(producer.sendAsync(message.getBytes())); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java index c04e3bd10..8da33cd3d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java @@ -53,6 +53,7 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageBuilder; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; @@ -72,6 +73,7 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; +import org.testng.annotations.Ignore; import org.testng.annotations.Test; /** @@ -95,7 +97,11 @@ public void testSimpleProducerEvents() throws Exception { final String topicName = "persistent://prop/ns-abc/topic0"; // 1. producer connect - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topicRef); @@ -136,7 +142,11 @@ public void testSimpleConsumerEvents() throws Exception { Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); assertEquals(getAvailablePermits(subRef), 1000 /* default */); - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); for (int i = 0; i < numMsgs * 2; i++) { String message = "my-message-" + i; producer.send(message.getBytes()); @@ -210,7 +220,11 @@ public void testConsumerFlowControl() throws Exception { Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName) .receiverQueueSize(recvQueueSize).subscribe(); - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topicRef); @@ -254,7 +268,11 @@ public void testActiveSubscriptionWithCache() throws Exception { // (1) Create subscription Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName) .receiverQueueSize(recvQueueSize).subscribe(); - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); // (2) Produce Messages for (int i = 0; i < recvQueueSize / 2; i++) { @@ -325,7 +343,11 @@ public Void call() throws Exception { }); } - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); for (int i = 0; i < recvQueueSize * numConsumersThreads; i++) { String message = "my-message-" + i; producer.send(message.getBytes()); @@ -352,7 +374,11 @@ public void testGracefulClose() throws Exception { final String topicName = "persistent://prop/ns-abc/topic4"; final String subName = "sub4"; - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); @@ -415,7 +441,11 @@ public void testSimpleCloseTopic() throws Exception { final String subName = "sub5"; Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe(); - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topicRef); @@ -443,7 +473,11 @@ public void testSingleClientMultipleSubscriptions() throws Exception { final String subName = "sub6"; pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe(); - pulsarClient.newProducer().topic(topicName).create(); + pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); try { pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe(); fail("Should have thrown an exception since one consumer is already connected"); @@ -676,7 +710,11 @@ public void testMessageExpiry() throws Exception { consumer.close(); assertFalse(subRef.getDispatcher().isConsumerConnected()); - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); for (int i = 0; i < numMsgs; i++) { String message = "my-message-" + i; producer.send(message.getBytes()); @@ -719,7 +757,11 @@ public void testMessageExpiryWithFewExpiredBacklog() throws Exception { assertTrue(subRef.getDispatcher().isConsumerConnected()); - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); for (int i = 0; i < numMsgs; i++) { String message = "my-message-" + i; producer.send(message.getBytes()); @@ -820,7 +862,11 @@ public void testReceiveWithTimeout() throws Exception { ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName) .subscriptionName(subName).receiverQueueSize(1000).subscribe(); - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); assertEquals(consumer.getAvailablePermits(), 0); @@ -847,7 +893,11 @@ public void testProducerReturnedMessageId() throws Exception { final String topicName = "persistent://prop/ns-abc/topic-xyz"; // 1. producer connect - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topicRef); @@ -896,11 +946,17 @@ public void testProducerQueueFullBlocking() throws Exception { PulsarClient client = PulsarClient.builder().serviceUrl(brokerUrl.toString()).build(); // 1. Producer connect - ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) client.newProducer().topic(topicName) - .maxPendingMessages(messages).blockIfQueueFull(true).sendTimeout(1, TimeUnit.SECONDS).create(); + ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) client.newProducer() + .topic(topicName) + .maxPendingMessages(messages) + .blockIfQueueFull(true) + .sendTimeout(1, TimeUnit.SECONDS) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); // 2. Stop broker - cleanup(); + super.internalCleanup(); // 2. producer publish messages long startTime = System.nanoTime(); @@ -937,11 +993,17 @@ public void testProducerQueueFullNonBlocking() throws Exception { // 1. Producer connect PulsarClient client = PulsarClient.builder().serviceUrl(brokerUrl.toString()).build(); - ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) client.newProducer().topic(topicName) - .maxPendingMessages(messages).blockIfQueueFull(false).sendTimeout(1, TimeUnit.SECONDS).create(); + ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) client.newProducer() + .topic(topicName) + .maxPendingMessages(messages) + .blockIfQueueFull(false) + .sendTimeout(1, TimeUnit.SECONDS) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); // 2. Stop broker - cleanup(); + super.internalCleanup(); // 2. producer publish messages long startTime = System.nanoTime(); @@ -980,9 +1042,16 @@ public void testDeleteTopics() throws Exception { BrokerService brokerService = pulsar.getBrokerService(); // 1. producers connect - Producer<byte[]> producer1 = pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic-1").create(); - /* Producer<byte[]> producer2 = */ pulsarClient.newProducer().topic("persistent://prop/ns-abc/topic-2") - .create(); + Producer<byte[]> producer1 = pulsarClient.newProducer() + .topic("persistent://prop/ns-abc/topic-1") + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + /* Producer<byte[]> producer2 = */ pulsarClient.newProducer() + .topic("persistent://prop/ns-abc/topic-2") + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); brokerService.updateRates(); @@ -1021,8 +1090,12 @@ public void testCompression(CompressionType compressionType) throws Exception { final String topicName = "persistent://prop/ns-abc/topic0" + compressionType; // 1. producer connect - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).compressionType(compressionType) - .create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .compressionType(compressionType) + .create(); Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe(); PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); @@ -1057,7 +1130,11 @@ public void testBrokerTopicStats() throws Exception { statsUpdater.shutdown(); final String namespace = "prop/ns-abc"; - Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://" + namespace + "/topic0").create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic("persistent://" + namespace + "/topic0") + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); // 1. producer publish messages for (int i = 0; i < 10; i++) { String message = "my-message-" + i; @@ -1086,7 +1163,10 @@ public void testPayloadCorruptionDetection() throws Exception { final String topicName = "persistent://prop/ns-abc/topic1"; // 1. producer connect - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe(); Message<byte[]> msg1 = MessageBuilder.create().setContent("message-1".getBytes()).build(); @@ -1141,7 +1221,11 @@ public void testMessageRedelivery() throws Exception { Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName) .subscriptionType(SubscriptionType.Shared).subscribe(); - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); // (1) Produce messages for (int i = 0; i < totalMessages; i++) { @@ -1196,7 +1280,11 @@ public void testMessageReplay() throws Exception { Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName) .subscriptionType(SubscriptionType.Shared).receiverQueueSize(1).subscribe(); - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topicRef); @@ -1258,8 +1346,11 @@ public void testMessageReplay() throws Exception { public void testCreateProducerWithSameName() throws Exception { String topic = "persistent://prop/ns-abc/testCreateProducerWithSameName"; - ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topic) - .producerName("test-producer-a"); + ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer() + .topic(topic) + .producerName("test-producer-a") + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition); Producer<byte[]> p1 = producerBuilder.create(); try { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index 50dfc3c00..8d2804f5f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -52,6 +52,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException; import org.apache.pulsar.client.api.MessageBuilder; +import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.RawMessage; @@ -209,7 +210,10 @@ public void testConcurrentReplicator() throws Exception { final TopicName topicName = TopicName.get(String.format("persistent://" + namespace + "/topic-%d", 0)); PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS) .build(); - Producer<byte[]> producer = client1.newProducer().topic(topicName.toString()).create(); + Producer<byte[]> producer = client1.newProducer().topic(topicName.toString()) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); producer.close(); PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getOrCreateTopic(topicName.toString()).get(); @@ -850,7 +854,10 @@ public void verifyChecksumAfterReplication() throws Exception { final String topicName = "persistent://pulsar/ns/checksumAfterReplication"; PulsarClient c1 = PulsarClient.builder().serviceUrl(url1.toString()).build(); - Producer<byte[]> p1 = c1.newProducer().topic(topicName).create(); + Producer<byte[]> p1 = c1.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); PulsarClient c2 = PulsarClient.builder().serviceUrl(url2.toString()).build(); RawReader reader2 = RawReader.create(c2, topicName, "sub").get(); @@ -897,7 +904,10 @@ public void testReplicatorOnPartitionedTopic(boolean isPartitionedTopic) throws // load namespace with dummy topic on ns PulsarClient client = PulsarClient.builder().serviceUrl(url1.toString()).build(); - client.newProducer().topic("persistent://" + namespace + "/dummyTopic").create(); + client.newProducer().topic("persistent://" + namespace + "/dummyTopic") + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); // persistent topic test try { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java index 5f5432cd1..f834a2415 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java @@ -37,6 +37,7 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageBuilder; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; @@ -267,7 +268,11 @@ void shutdown() throws Exception { this.namespace = dest.getNamespace(); this.topicName = dest.toString(); client = PulsarClient.builder().serviceUrl(url.toString()).statsInterval(0, TimeUnit.SECONDS).build(); - producer = client.newProducer().topic(topicName).create(); + producer = client.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); } @@ -276,12 +281,11 @@ void shutdown() throws Exception { this.namespace = dest.getNamespace(); this.topicName = dest.toString(); client = PulsarClient.builder().serviceUrl(url.toString()).statsInterval(0, TimeUnit.SECONDS).build(); - ProducerBuilder<byte[]> producerBuilder = client.newProducer().topic(topicName); - if (batch) { - producerBuilder.enableBatching(true); - producerBuilder.batchingMaxPublishDelay(1, TimeUnit.SECONDS); - producerBuilder.batchingMaxMessages(5); - } + ProducerBuilder<byte[]> producerBuilder = client.newProducer() + .topic(topicName) + .enableBatching(batch) + .batchingMaxPublishDelay(1, TimeUnit.SECONDS) + .batchingMaxMessages(5); producer = producerBuilder.create(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java index 2a590eb95..c5ab30bc0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java @@ -73,7 +73,10 @@ public void testExclusiveSingleAckedNormalTopic() throws Exception { HashSet<String> messageDataHashSet = new HashSet<String>(); // 1. producer connect - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topicRef); @@ -159,7 +162,10 @@ public void testSharedSingleAckedNormalTopic() throws Exception { final String messagePredicate = "my-message-" + key + "-"; final int totalMessages = 10; // 1. producer connect - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topicRef); assertEquals(topicRef.getProducers().size(), 1); @@ -239,7 +245,10 @@ public void testFailoverSingleAckedNormalTopic() throws Exception { final String messagePredicate = "my-message-" + key + "-"; final int totalMessages = 10; // 1. producer connect - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topicRef); assertEquals(topicRef.getProducers().size(), 1); @@ -355,7 +364,10 @@ public void testExclusiveCumulativeAckedNormalTopic() throws Exception { final int totalMessages = 10; // 1. producer connect - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topicRef); @@ -414,7 +426,9 @@ public void testExclusiveSingleAckedPartitionedTopic() throws Exception { // 1. producer connect Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) - .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.RoundRobinPartition) + .create(); // 2. Create consumer Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName) @@ -468,7 +482,8 @@ public void testSharedSingleAckedPartitionedTopic() throws Exception { // 1. producer connect Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) - .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); // 2. Create consumer ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName) @@ -564,7 +579,9 @@ public void testFailoverSingleAckedPartitionedTopic() throws Exception { // 1. producer connect Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) - .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.RoundRobinPartition) + .create(); // 2. Create consumer ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName) @@ -645,7 +662,10 @@ public void testFailoverInactiveConsumer() throws Exception { final String messagePredicate = "my-message-" + key + "-"; final int totalMessages = 10; // 1. producer connect - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topicRef); assertEquals(topicRef.getProducers().size(), 1); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java index 549d2b79c..9bc273064 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java @@ -38,6 +38,7 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageListener; +import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Reader; @@ -65,7 +66,10 @@ protected void cleanup() throws Exception { @Test public void testSimpleTermination() throws Exception { - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); /* MessageId msgId1 = */producer.send("test-msg-1".getBytes()); /* MessageId msgId2 = */producer.send("test-msg-2".getBytes()); @@ -84,7 +88,10 @@ public void testSimpleTermination() throws Exception { @Test public void testCreateProducerOnTerminatedTopic() throws Exception { - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); /* MessageId msgId1 = */producer.send("test-msg-1".getBytes()); /* MessageId msgId2 = */producer.send("test-msg-2".getBytes()); @@ -103,7 +110,10 @@ public void testCreateProducerOnTerminatedTopic() throws Exception { @Test(timeOut = 20000) public void testTerminateWhilePublishing() throws Exception { - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); CyclicBarrier barrier = new CyclicBarrier(2); List<CompletableFuture<MessageId>> futures = new ArrayList<>(); @@ -148,7 +158,10 @@ public void testTerminateWhilePublishing() throws Exception { @Test public void testDoubleTerminate() throws Exception { - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); /* MessageId msgId1 = */producer.send("test-msg-1".getBytes()); /* MessageId msgId2 = */producer.send("test-msg-2".getBytes()); @@ -176,7 +189,10 @@ public void testTerminatePartitionedTopic() throws Exception { @Test(timeOut = 20000) public void testSimpleTerminationConsumer() throws Exception { - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); org.apache.pulsar.client.api.Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName) .subscriptionName("my-sub").subscribe(); @@ -211,7 +227,10 @@ public void testSimpleTerminationConsumer() throws Exception { @Test(timeOut = 20000) public void testSimpleTerminationMessageListener() throws Exception { - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); CountDownLatch latch = new CountDownLatch(1); @@ -248,7 +267,10 @@ public void reachedEndOfTopic(Consumer<byte[]> consumer) { @Test(timeOut = 20000) public void testSimpleTerminationReader() throws Exception { - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); MessageId msgId1 = producer.send("test-msg-1".getBytes()); MessageId msgId2 = producer.send("test-msg-2".getBytes()); @@ -277,7 +299,10 @@ public void testSimpleTerminationReader() throws Exception { @Test(timeOut = 20000) public void testSimpleTerminationReaderListener() throws Exception { - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); CountDownLatch latch = new CountDownLatch(1); @@ -311,7 +336,10 @@ public void reachedEndOfTopic(Reader<byte[]> reader) { @Test(timeOut = 20000) public void testSubscribeOnTerminatedTopic() throws Exception { - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); /* MessageId msgId1 = */ producer.send("test-msg-1".getBytes()); MessageId msgId2 = producer.send("test-msg-2".getBytes()); @@ -327,7 +355,10 @@ public void testSubscribeOnTerminatedTopic() throws Exception { @Test(timeOut = 20000) public void testSubscribeOnTerminatedTopicWithNoMessages() throws Exception { - pulsarClient.newProducer().topic(topicName).create(); + pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); admin.persistentTopics().terminateTopicAsync(topicName).get(); org.apache.pulsar.client.api.Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java index 9b6281935..d23c49235 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java @@ -337,7 +337,8 @@ public void testPartitionTopicLookup() throws Exception { /**** broker-2 started ****/ Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName.toString()) - .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName.toString()) .subscriptionName("my-partitioned-subscriber").subscribe(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java index e0c46acfa..aba0e1b4d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java @@ -600,7 +600,9 @@ public void testBrokerSubscriptionRecovery(boolean unloadBundleGracefully) throw .subscriptionType(SubscriptionType.Shared).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/unacked-topic") - .create(); + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); CountDownLatch latch = new CountDownLatch(totalProducedMsgs); // (1) Produced Messages diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java index 52deb52f1..553eec89c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java @@ -147,7 +147,10 @@ public void testPartitionedNonPersistentTopic(SubscriptionType type) throws Exce Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("subscriber-1") .subscriptionType(type).subscribe(); - Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topic) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); int totalProduceMsg = 500; for (int i = 0; i < totalProduceMsg; i++) { @@ -191,7 +194,10 @@ public void testPartitionedNonPersistentTopicWithTcpLookup(SubscriptionType type Consumer<byte[]> consumer = client.newConsumer().topic(topic).subscriptionName("subscriber-1") .subscriptionType(type).subscribe(); - Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topic) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); // Ensure all partitions exist for (int i = 0; i < numPartitions; i++) { @@ -493,7 +499,10 @@ public void testReplicator() throws Exception { ConsumerImpl<byte[]> repl3Consumer = (ConsumerImpl<byte[]>) client3.newConsumer().topic(globalTopicName) .subscriptionName("subscriber-1").subscribe(); - Producer<byte[]> producer = client1.newProducer().topic(globalTopicName).create(); + Producer<byte[]> producer = client1.newProducer().topic(globalTopicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); Thread.sleep(timeWaitToSync); @@ -778,7 +787,10 @@ public void testMsgDropStat() throws Exception { Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topicName).subscriptionName("subscriber-2") .receiverQueueSize(1).subscriptionType(SubscriptionType.Shared).subscribe(); - ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer().topic(topicName).create(); + ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); String firstTimeConnected = producer.getConnectedSince(); ExecutorService executor = Executors.newFixedThreadPool(5); byte[] msgData = "testData".getBytes(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java index 583026585..247a7734b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java @@ -73,7 +73,8 @@ public void testRoundRobinProducer() throws Exception { admin.persistentTopics().createPartitionedTopic(topicName.toString(), numPartitions); Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName.toString()) - .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName.toString()) .subscriptionName("my-partitioned-subscriber").subscribe(); @@ -250,7 +251,10 @@ public void testInvalidSequence() throws Exception { // ok } - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName.toString()).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName.toString()) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); producer.close(); try { @@ -289,7 +293,10 @@ public void testSillyUser() throws Exception { } try { - producer = pulsarClient.newProducer().topic(topicName.toString()).create(); + producer = pulsarClient.newProducer().topic(topicName.toString()) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); consumer = pulsarClient.newConsumer().topic(topicName.toString()).subscriptionName("my-sub").subscribe(); producer.send("message1".getBytes()); producer.send("message2".getBytes()); @@ -343,7 +350,8 @@ public void testAsyncPartitionedProducerConsumer() throws Exception { admin.persistentTopics().createPartitionedTopic(topicName.toString(), numPartitions); Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName.toString()) - .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName.toString()) .subscriptionName("my-partitioned-subscriber").subscriptionType(SubscriptionType.Shared).subscribe(); @@ -391,7 +399,8 @@ public void testAsyncPartitionedProducerConsumerQueueSizeOne() throws Exception admin.persistentTopics().createPartitionedTopic(topicName.toString(), numPartitions); Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName.toString()) - .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName.toString()) .subscriptionName("my-partitioned-subscriber").receiverQueueSize(1).subscribe(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java index 8a536f2a3..fb2c2b40a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java @@ -130,10 +130,13 @@ public void testAsyncProducerAndAsyncAck(int batchMessageDelayMs, int ackTimeout Consumer<byte[]> consumer = consumerBuilder.subscribe(); ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer() - .topic("persistent://my-property/tp1/my-ns/my-topic2"); + .topic("persistent://my-property/tp1/my-ns/my-topic2") + .messageRoutingMode(MessageRoutingMode.SinglePartition); if (batchMessageDelayMs != 0) { producerBuilder.enableBatching(true).batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS) .batchingMaxMessages(5); + } else { + producerBuilder.enableBatching(false); } Producer<byte[]> producer = producerBuilder.create(); @@ -185,10 +188,13 @@ public void testAsyncProducerAndReceiveAsyncAndAsyncAck(int batchMessageDelayMs, Consumer<byte[]> consumer = consumerBuilder.subscribe(); ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer() - .topic("persistent://my-property/tp1/my-ns/my-topic2"); + .topic("persistent://my-property/tp1/my-ns/my-topic2") + .messageRoutingMode(MessageRoutingMode.SinglePartition); if (batchMessageDelayMs != 0) { producerBuilder.enableBatching(true).batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS) .batchingMaxMessages(5); + } else { + producerBuilder.enableBatching(false); } Producer<byte[]> producer = producerBuilder.create(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index b5dcd87dd..ad406ce2c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -591,16 +591,22 @@ public void testSendBigMessageSizeButCompressed() throws Exception { final String topic = "persistent://my-property/my-ns/bigMsg"; // (a) non-batch msg with compression - Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).compressionType(CompressionType.LZ4) - .create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topic) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .compressionType(CompressionType.LZ4) + .create(); Message<byte[]> message = MessageBuilder.create().setContent(new byte[PulsarDecoder.MaxMessageSize + 1]) .build(); producer.send(message); producer.close(); // (b) batch-msg - producer = pulsarClient.newProducer().topic(topic).enableBatching(true).compressionType(CompressionType.LZ4) - .create(); + producer = pulsarClient.newProducer().topic(topic) + .enableBatching(true) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .compressionType(CompressionType.LZ4) + .create(); message = MessageBuilder.create().setContent(new byte[PulsarDecoder.MaxMessageSize + 1]).build(); try { producer.send(message); @@ -611,7 +617,11 @@ public void testSendBigMessageSizeButCompressed() throws Exception { producer.close(); // (c) non-batch msg without compression - producer = pulsarClient.newProducer().topic(topic).compressionType(CompressionType.NONE).create(); + producer = pulsarClient.newProducer().topic(topic) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .compressionType(CompressionType.NONE) + .create(); message = MessageBuilder.create().setContent(new byte[PulsarDecoder.MaxMessageSize + 1]).build(); try { producer.send(message); @@ -622,7 +632,11 @@ public void testSendBigMessageSizeButCompressed() throws Exception { producer.close(); // (d) non-batch msg with compression and try to consume message - producer = pulsarClient.newProducer().topic(topic).compressionType(CompressionType.LZ4).create(); + producer = pulsarClient.newProducer() + .topic(topic) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .compressionType(CompressionType.LZ4).create(); Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").subscribe(); byte[] content = new byte[PulsarDecoder.MaxMessageSize + 10]; message = MessageBuilder.create().setContent(content).build(); @@ -911,7 +925,9 @@ public void testSendCallBack() throws Exception { final int totalMsg = 100; Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic1") - .create(); + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); for (int i = 0; i < totalMsg; i++) { final String message = "my-message-" + i; Message<byte[]> msg = MessageBuilder.create().setContent(message.getBytes()).build(); @@ -1294,7 +1310,10 @@ public void testShouldNotBlockConsumerIfRedeliverBeforeReceive() throws Exceptio .subscriptionType(SubscriptionType.Shared).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); Producer<byte[]> producer = pulsarClient.newProducer() - .topic("persistent://my-property/my-ns/unacked-topic").create(); + .topic("persistent://my-property/my-ns/unacked-topic") + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); // (1) Produced Messages for (int i = 0; i < totalProducedMsgs; i++) { @@ -1423,6 +1442,8 @@ public void testUnackedBlockAtBatch(int batchMessageDelayMs) throws Exception { producerBuidler.enableBatching(true); producerBuidler.batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS); producerBuidler.batchingMaxMessages(5); + } else { + producerBuidler.enableBatching(false); } Producer<byte[]> producer = producerBuidler.create(); @@ -1868,7 +1889,9 @@ public void testSharedSamePriorityConsumer() throws Exception { Consumer<byte[]> c1 = consumerBuilder.subscribe(); Consumer<byte[]> c2 = consumerBuilder.subscribe(); Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic2") - .create(); + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); List<Future<MessageId>> futures = Lists.newArrayList(); // Asynchronously produce messages @@ -2274,8 +2297,12 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe // 2. Producer with valid key name Producer<byte[]> producer = pulsarClient.newProducer() - .topic("persistent://my-property/use/myenc-ns/myenc-topic1").addEncryptionKey("client-rsa.pem") - .cryptoKeyReader(new EncKeyReader()).create(); + .topic("persistent://my-property/use/myenc-ns/myenc-topic1") + .addEncryptionKey("client-rsa.pem") + .cryptoKeyReader(new EncKeyReader()) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); for (int i = 0; i < totalMsg; i++) { String message = "my-message-" + i; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java index 80ac52068..36238ba77 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java @@ -122,6 +122,8 @@ public void testSyncProducerAndConsumer(int batchMessageDelayMs) throws Exceptio producerConf.setBatchingEnabled(true); producerConf.setBatchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS); producerConf.setBatchingMaxMessages(5); + } else { + producerConf.setBatchingEnabled(false); } Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1", producerConf); @@ -157,6 +159,8 @@ public void testAsyncProducerAndAsyncAck(int batchMessageDelayMs) throws Excepti producerConf.setBatchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS); producerConf.setBatchingMaxMessages(5); producerConf.setBatchingEnabled(true); + } else { + producerConf.setBatchingEnabled(false); } Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic2", producerConf); List<Future<MessageId>> futures = Lists.newArrayList(); @@ -214,6 +218,8 @@ public void testMessageListener(int batchMessageDelayMs) throws Exception { producerConf.setBatchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS); producerConf.setBatchingMaxMessages(5); producerConf.setBatchingEnabled(true); + } else { + producerConf.setBatchingEnabled(false); } Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic3", producerConf); List<Future<MessageId>> futures = Lists.newArrayList(); @@ -249,6 +255,8 @@ public void testBackoffAndReconnect(int batchMessageDelayMs) throws Exception { producerConf.setBatchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS); producerConf.setBatchingMaxMessages(5); producerConf.setBatchingEnabled(true); + } else { + producerConf.setBatchingEnabled(false); } Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic4", producerConf); @@ -299,6 +307,8 @@ public void testSendTimeout(int batchMessageDelayMs) throws Exception { producerConf.setBatchingMaxPublishDelay(2 * batchMessageDelayMs, TimeUnit.MILLISECONDS); producerConf.setBatchingMaxMessages(5); producerConf.setBatchingEnabled(true); + } else { + producerConf.setBatchingEnabled(false); } producerConf.setSendTimeout(1, TimeUnit.SECONDS); @@ -520,6 +530,8 @@ public Void call() throws Exception { producerConf.setBatchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS); producerConf.setBatchingMaxMessages(5); producerConf.setBatchingEnabled(true); + } else { + producerConf.setBatchingEnabled(false); } Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic7", producerConf); for (int i = 0; i < recvQueueSize; i++) { @@ -627,6 +639,7 @@ public void testSendBigMessageSizeButCompressed() throws Exception { // (a) non-batch msg with compression ProducerConfiguration producerConf = new ProducerConfiguration(); + producerConf.setBatchingEnabled(false); producerConf.setCompressionType(CompressionType.LZ4); Producer producer = pulsarClient.createProducer(topic, producerConf); Message message = MessageBuilder.create().setContent(new byte[PulsarDecoder.MaxMessageSize + 1]).build(); @@ -649,6 +662,7 @@ public void testSendBigMessageSizeButCompressed() throws Exception { // (c) non-batch msg without compression producerConf = new ProducerConfiguration(); + producerConf.setBatchingEnabled(false); producerConf.setCompressionType(CompressionType.NONE); producer = pulsarClient.createProducer(topic, producerConf); message = MessageBuilder.create().setContent(new byte[PulsarDecoder.MaxMessageSize + 1]).build(); @@ -662,6 +676,7 @@ public void testSendBigMessageSizeButCompressed() throws Exception { // (d) non-batch msg with compression and try to consume message producerConf = new ProducerConfiguration(); + producerConf.setBatchingEnabled(false); producerConf.setCompressionType(CompressionType.LZ4); producer = pulsarClient.createProducer(topic, producerConf); Consumer consumer = pulsarClient.subscribe(topic, "sub1"); @@ -704,6 +719,8 @@ public void testActiveAndInActiveConsumerEntryCacheBehavior() throws Exception { producerConf.setBatchingEnabled(true); producerConf.setBatchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS); producerConf.setBatchingMaxMessages(5); + } else { + producerConf.setBatchingEnabled(false); } /************ usecase-1: *************/ @@ -959,6 +976,7 @@ public void testSendCallBack() throws Exception { final int totalMsg = 100; final ProducerConfiguration producerConf = new ProducerConfiguration(); + producerConf.setBatchingEnabled(false); Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1", producerConf); for (int i = 0; i < totalMsg; i++) { final String message = "my-message-" + i; @@ -1358,6 +1376,7 @@ public void testShouldNotBlockConsumerIfRedeliverBeforeReceive() throws Exceptio .subscribe("persistent://my-property/use/my-ns/unacked-topic", "subscriber-1", conf); ProducerConfiguration producerConf = new ProducerConfiguration(); + producerConf.setBatchingEnabled(false); Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/unacked-topic", producerConf); @@ -1494,6 +1513,8 @@ public void testUnackedBlockAtBatch(int batchMessageDelayMs) throws Exception { producerConf.setBatchingEnabled(true); producerConf.setBatchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS); producerConf.setBatchingMaxMessages(5); + } else { + producerConf.setBatchingEnabled(false); } Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/unacked-topic", @@ -1961,6 +1982,7 @@ public void testSharedSamePriorityConsumer() throws Exception { Consumer c2 = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic2", "my-subscriber-name", conf1); ProducerConfiguration producerConf = new ProducerConfiguration(); + producerConf.setBatchingEnabled(false); Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic2", producerConf); List<Future<MessageId>> futures = Lists.newArrayList(); @@ -2366,6 +2388,7 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe final int totalMsg = 10; ProducerConfiguration producerConf = new ProducerConfiguration(); + producerConf.setBatchingEnabled(false); Message msg = null; Set<String> messageSet = Sets.newHashSet(); @@ -2389,6 +2412,7 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe producerConf = new ProducerConfiguration(); producerConf.setCryptoKeyReader(new EncKeyReader()); producerConf.addEncryptionKey("client-rsa.pem"); + producerConf.setBatchingEnabled(false); Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/myenc-topic1", producerConf); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java index 6d6fc924c..48775e061 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java @@ -46,6 +46,7 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageBuilder; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg; @@ -89,7 +90,10 @@ public void producerSendAsync() throws PulsarClientException { final int numberOfMessages = 30; // 2. Create Producer - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); // 3. Create Consumer Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName) @@ -181,7 +185,10 @@ public void partitionedProducerSendAsync() throws PulsarClientException, PulsarA admin.persistentTopics().createPartitionedTopic(topicName, numberOfPartitions); // 2. Create Producer - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); // 3. Create Consumer Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName) @@ -230,7 +237,10 @@ public void partitionedProducerSend() throws PulsarClientException, PulsarAdminE admin.persistentTopics().createPartitionedTopic(topicName, numberOfPartitions); // 2. Create Producer - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); // 3. Create Consumer Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName) @@ -279,7 +289,10 @@ public void testChecksumVersionComptability() throws Exception { final String topicName = "persistent://prop/use/ns-abc/topic1"; // 1. producer connect - ProducerImpl<byte[]> prod = (ProducerImpl<byte[]>) pulsarClient.newProducer().topic(topicName).create(); + ProducerImpl<byte[]> prod = (ProducerImpl<byte[]>) pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); ProducerImpl<byte[]> producer = spy(prod); // return higher version compare to broker : so, it forces client-producer to remove checksum from payload doReturn(producer.brokerChecksumSupportedVersion() + 1).when(producer).brokerChecksumSupportedVersion(); @@ -344,7 +357,10 @@ public void testChecksumReconnection() throws Exception { final String topicName = "persistent://prop/use/ns-abc/topic1"; // 1. producer connect - ProducerImpl<byte[]> prod = (ProducerImpl<byte[]>) pulsarClient.newProducer().topic(topicName).create(); + ProducerImpl<byte[]> prod = (ProducerImpl<byte[]>) pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); ProducerImpl<byte[]> producer = spy(prod); // mock: broker-doesn't support checksum (remote_version < brokerChecksumSupportedVersion) so, it forces // client-producer to perform checksum-strip from msg at reconnection @@ -420,8 +436,12 @@ public void testCorruptMessageRemove() throws Exception { final String topicName = "persistent://prop/use/ns-abc/retry-topic"; // 1. producer connect - ProducerImpl<byte[]> prod = (ProducerImpl<byte[]>) pulsarClient.newProducer().topic(topicName) - .sendTimeout(10, TimeUnit.MINUTES).create(); + ProducerImpl<byte[]> prod = (ProducerImpl<byte[]>) pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .sendTimeout(10, TimeUnit.MINUTES) + .create(); ProducerImpl<byte[]> producer = spy(prod); Field producerIdField = ProducerImpl.class.getDeclaredField("producerId"); producerIdField.setAccessible(true); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java index 9624e8e54..92c2ccc4a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java @@ -32,6 +32,7 @@ import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClientException; @@ -144,11 +145,15 @@ public void testBinaryProtoToGetTopicsOfNamespace() throws Exception { int totalMessages = 30; Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topicName1) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) .create(); Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topicName2) + .enableBatching(false) .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) .create(); Producer<byte[]> producer3 = pulsarClient.newProducer().topic(topicName3) + .enableBatching(false) .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) .create(); @@ -301,11 +306,15 @@ public void testStartEmptyPatternConsumer() throws Exception { int totalMessages = 30; Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topicName1) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) .create(); Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topicName2) + .enableBatching(false) .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) .create(); Producer<byte[]> producer3 = pulsarClient.newProducer().topic(topicName3) + .enableBatching(false) .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) .create(); @@ -368,11 +377,15 @@ public void testAutoSubscribePatternConsumer() throws Exception { int totalMessages = 30; Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topicName1) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) .create(); Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topicName2) + .enableBatching(false) .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) .create(); Producer<byte[]> producer3 = pulsarClient.newProducer().topic(topicName3) + .enableBatching(false) .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) .create(); @@ -415,6 +428,7 @@ public void testAutoSubscribePatternConsumer() throws Exception { String topicName4 = "persistent://my-property/my-ns/pattern-topic-4-" + key; admin.persistentTopics().createPartitionedTopic(topicName4, 4); Producer<byte[]> producer4 = pulsarClient.newProducer().topic(topicName4) + .enableBatching(false) .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) .create(); @@ -471,11 +485,15 @@ public void testAutoUnbubscribePatternConsumer() throws Exception { int totalMessages = 30; Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topicName1) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) .create(); Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topicName2) + .enableBatching(false) .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) .create(); Producer<byte[]> producer3 = pulsarClient.newProducer().topic(topicName3) + .enableBatching(false) .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) .create(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java index 053cb5ea1..8e4c7fa2d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java @@ -61,7 +61,10 @@ public void testSharedAckedNormalTopic() throws Exception { final int totalMessages = 15; // 1. producer connect - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); // 2. Create consumer Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName) @@ -156,7 +159,10 @@ public void testExclusiveAckedNormalTopic() throws Exception { final int totalMessages = 15; // 1. producer connect - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); // 2. Create consumer Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName) @@ -251,7 +257,10 @@ public void testFailoverAckedNormalTopic() throws Exception { final int totalMessages = 15; // 1. producer connect - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); // 2. Create consumer Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName) @@ -356,7 +365,8 @@ public void testSharedAckedPartitionedTopic() throws Exception { // 1. producer connect Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) - .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); // 2. Create consumer Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java index 5e7383e22..75e666ad7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java @@ -35,6 +35,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.MessageBuilder; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.RawMessage; import org.apache.pulsar.client.api.RawReader; @@ -77,7 +78,12 @@ public void cleanup() throws Exception { private Set<String> publishMessages(String topic, int count) throws Exception { Set<String> keys = new HashSet<>(); - try (Producer<byte[]> producer = pulsarClient.newProducer().maxPendingMessages(count).topic(topic).create()) { + try (Producer<byte[]> producer = pulsarClient.newProducer() + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .maxPendingMessages(count) + .topic(topic) + .create()) { Future<?> lastFuture = null; for (int i = 0; i < count; i++) { String key = "key"+i; @@ -232,8 +238,13 @@ public void testFlowControl() throws Exception { public void testBatchingExtractKeysAndIds() throws Exception { String topic = "persistent://my-property/my-ns/my-raw-topic"; - try (Producer producer = pulsarClient.newProducer().topic(topic).maxPendingMessages(3) - .enableBatching(true).batchingMaxMessages(3).batchingMaxPublishDelay(1, TimeUnit.HOURS).create()) { + try (Producer producer = pulsarClient.newProducer().topic(topic) + .maxPendingMessages(3) + .enableBatching(true) + .batchingMaxMessages(3) + .batchingMaxPublishDelay(1, TimeUnit.HOURS) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create()) { producer.sendAsync(MessageBuilder.create() .setKey("key1").setContent("my-content-1".getBytes()).build()); producer.sendAsync(MessageBuilder.create() @@ -265,8 +276,13 @@ public void testBatchingExtractKeysAndIds() throws Exception { public void testBatchingRebatch() throws Exception { String topic = "persistent://my-property/my-ns/my-raw-topic"; - try (Producer producer = pulsarClient.newProducer().topic(topic).maxPendingMessages(3) - .enableBatching(true).batchingMaxMessages(3).batchingMaxPublishDelay(1, TimeUnit.HOURS).create()) { + try (Producer producer = pulsarClient.newProducer().topic(topic) + .maxPendingMessages(3) + .enableBatching(true) + .batchingMaxMessages(3) + .batchingMaxPublishDelay(1, TimeUnit.HOURS) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create()) { producer.sendAsync(MessageBuilder.create() .setKey("key1").setContent("my-content-1".getBytes()).build()); producer.sendAsync(MessageBuilder.create() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java index 6aa574ddf..83e70e5f8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java @@ -35,6 +35,7 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClientException; @@ -151,11 +152,15 @@ public void testSyncProducerAndConsumer() throws Exception { // 1. producer connect Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topicName1) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) .create(); Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topicName2) + .enableBatching(false) .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) .create(); Producer<byte[]> producer3 = pulsarClient.newProducer().topic(topicName3) + .enableBatching(false) .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) .create(); @@ -212,11 +217,15 @@ public void testAsyncConsumer() throws Exception { // 1. producer connect Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topicName1) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) .create(); Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topicName2) + .enableBatching(false) .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) .create(); Producer<byte[]> producer3 = pulsarClient.newProducer().topic(topicName3) + .enableBatching(false) .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) .create(); @@ -291,11 +300,15 @@ public void testConsumerUnackedRedelivery() throws Exception { // 1. producer connect Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topicName1) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) .create(); Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topicName2) + .enableBatching(false) .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) .create(); Producer<byte[]> producer3 = pulsarClient.newProducer().topic(topicName3) + .enableBatching(false) .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) .create(); @@ -431,11 +444,15 @@ public void testSubscribeUnsubscribeSingleTopic() throws Exception { // 1. producer connect Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topicName1) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) .create(); Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topicName2) + .enableBatching(false) .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) .create(); Producer<byte[]> producer3 = pulsarClient.newProducer().topic(topicName3) + .enableBatching(false) .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) .create(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java index e6adf5c90..42c957f09 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java @@ -64,7 +64,10 @@ public void testExclusiveSingleAckedNormalTopic() throws Exception { final int totalMessages = 10; // 1. producer connect - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); // 2. Create consumer Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName) @@ -119,7 +122,10 @@ public void testExclusiveCumulativeAckedNormalTopic() throws Exception { final int totalMessages = 10; // 1. producer connect - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); // 2. Create consumer Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName) @@ -166,7 +172,9 @@ public void testSharedSingleAckedPartitionedTopic() throws Exception { // 1. producer connect Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) - .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.RoundRobinPartition) + .create(); // 2. Create consumer Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName) @@ -252,7 +260,9 @@ public void testFailoverSingleAckedPartitionedTopic() throws Exception { // 1. producer connect Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) - .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.RoundRobinPartition) + .create(); // 2. Create consumer Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName) @@ -341,7 +351,10 @@ public void testCheckUnAcknowledgedMessageTimer() throws PulsarClientException, final int totalMessages = 3; // 1. producer connect - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); // 2. Create consumer ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java index 6cdc5069b..2f23554c5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java @@ -26,6 +26,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClientException; @@ -94,7 +95,10 @@ public void zeroQueueSizeNormalConsumer() throws PulsarClientException { final String messagePredicate = "my-message-" + key + "-"; // 2. Create Producer - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); // 3. Create Consumer ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName) @@ -128,7 +132,10 @@ public void zeroQueueSizeSharedSubscription() throws PulsarClientException { final String messagePredicate = "my-message-" + key + "-"; // 2. Create Producer - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); // 3. Create Consumer int numOfSubscribers = 4; @@ -166,7 +173,10 @@ public void zeroQueueSizeFailoverSubscription() throws PulsarClientException { final String messagePredicate = "my-message-" + key + "-"; // 2. Create Producer - Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); // 3. Create Consumer ConsumerImpl<byte[]> consumer1 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName) @@ -217,11 +227,14 @@ public void testFailedZeroQueueSizeBatchMessage() throws PulsarClientException { .subscribe(); ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer() - .topic("persistent://prop-xyz/use/ns-abc/topic1"); + .topic("persistent://prop-xyz/use/ns-abc/topic1") + .messageRoutingMode(MessageRoutingMode.SinglePartition); if (batchMessageDelayMs != 0) { producerBuilder.enableBatching(true).batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS) .batchingMaxMessages(5); + } else { + producerBuilder.enableBatching(false); } Producer<byte[]> producer = producerBuilder.create(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index 8f64ef544..a0382bad1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -42,6 +42,7 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageBuilder; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.BatchMessageIdImpl; @@ -94,7 +95,11 @@ public void testCompaction() throws Exception { final int numMessages = 20; final int maxKeys = 10; - Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topic) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); Map<String, byte[]> expected = new HashMap<>(); List<Pair<String, byte[]>> all = new ArrayList<>(); @@ -147,7 +152,10 @@ public void testCompaction() throws Exception { public void testReadCompactedBeforeCompaction() throws Exception { String topic = "persistent://my-property/use/my-ns/my-topic1"; - Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topic) + .enableBatching(false) + .create(); pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close(); @@ -185,7 +193,10 @@ public void testReadCompactedBeforeCompaction() throws Exception { public void testReadEntriesAfterCompaction() throws Exception { String topic = "persistent://my-property/use/my-ns/my-topic1"; - Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topic) + .enableBatching(false) + .create(); pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close(); @@ -214,7 +225,10 @@ public void testReadEntriesAfterCompaction() throws Exception { public void testSeekEarliestAfterCompaction() throws Exception { String topic = "persistent://my-property/use/my-ns/my-topic1"; - Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topic) + .enableBatching(false) + .create(); producer.send(MessageBuilder.create().setKey("key0").setContent("content0".getBytes()).build()); producer.send(MessageBuilder.create().setKey("key0").setContent("content1".getBytes()).build()); @@ -253,7 +267,10 @@ public void testSeekEarliestAfterCompaction() throws Exception { public void testBrokerRestartAfterCompaction() throws Exception { String topic = "persistent://my-property/use/my-ns/my-topic1"; - Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topic) + .enableBatching(false) + .create(); pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close(); @@ -293,7 +310,10 @@ public void testBrokerRestartAfterCompaction() throws Exception { public void testCompactEmptyTopic() throws Exception { String topic = "persistent://my-property/use/my-ns/my-topic1"; - Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topic) + .enableBatching(false) + .create(); pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close(); @@ -317,7 +337,7 @@ public void testFirstMessageRetained() throws Exception { pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") .readCompacted(true).subscribe().close(); - try (Producer producer = pulsarClient.createProducer(topic)) { + try (Producer producer = pulsarClient.newProducer().topic(topic).enableBatching(false).create()) { producer.sendAsync(MessageBuilder.create() .setKey("key1") .setContent("my-message-1".getBytes()).build()); @@ -365,9 +385,14 @@ public void testBatchMessageIdsDontChange() throws Exception { pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") .readCompacted(true).subscribe().close(); - try (Producer producer = pulsarClient.newProducer().topic(topic).maxPendingMessages(3) - .enableBatching(true).batchingMaxMessages(3) - .batchingMaxPublishDelay(1, TimeUnit.HOURS).create()) { + try (Producer producer = pulsarClient.newProducer().topic(topic) + .maxPendingMessages(3) + .enableBatching(true) + .batchingMaxMessages(3) + .batchingMaxPublishDelay(1, TimeUnit.HOURS) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create() + ) { producer.sendAsync(MessageBuilder.create() .setKey("key1") .setContent("my-message-1".getBytes()).build()); @@ -425,10 +450,17 @@ public void testWholeBatchCompactedOut() throws Exception { pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") .readCompacted(true).subscribe().close(); - try (Producer producerNormal = pulsarClient.newProducer().topic(topic).create(); - Producer producerBatch = pulsarClient.newProducer().topic(topic).maxPendingMessages(3) - .enableBatching(true).batchingMaxMessages(3) - .batchingMaxPublishDelay(1, TimeUnit.HOURS).create()) { + try (Producer producerNormal = pulsarClient.newProducer().topic(topic) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + Producer producerBatch = pulsarClient.newProducer().topic(topic) + .maxPendingMessages(3) + .enableBatching(true) + .batchingMaxMessages(3) + .batchingMaxPublishDelay(1, TimeUnit.HOURS) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create()) { producerBatch.sendAsync(MessageBuilder.create() .setKey("key1") .setContent("my-message-1".getBytes()).build()); @@ -533,10 +565,17 @@ public void testEmptyPayloadDeletes() throws Exception { pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") .readCompacted(true).subscribe().close(); - try (Producer producerNormal = pulsarClient.newProducer().topic(topic).create(); - Producer producerBatch = pulsarClient.newProducer().topic(topic).maxPendingMessages(3) - .enableBatching(true).batchingMaxMessages(3) - .batchingMaxPublishDelay(1, TimeUnit.HOURS).create()) { + try (Producer producerNormal = pulsarClient.newProducer() + .topic(topic) + .enableBatching(false) + .create(); + Producer producerBatch = pulsarClient.newProducer() + .topic(topic) + .maxPendingMessages(3) + .enableBatching(true) + .batchingMaxMessages(3) + .batchingMaxPublishDelay(1, TimeUnit.HOURS) + .create()) { // key0 persists through it all producerNormal.sendAsync(MessageBuilder.create() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java index 84bc3959b..f41c62c29 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java @@ -35,6 +35,7 @@ import org.apache.bookkeeper.client.LedgerHandle; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.client.api.MessageBuilder; +import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.RawMessage; import org.apache.pulsar.client.impl.RawMessageImpl; @@ -117,7 +118,10 @@ public void testCompaction() throws Exception { final int numMessages = 1000; final int maxKeys = 10; - Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topic) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); Map<String, byte[]> expected = new HashMap<>(); Random r = new Random(0); @@ -138,7 +142,10 @@ public void testCompaction() throws Exception { public void testCompactAddCompact() throws Exception { String topic = "persistent://my-property/use/my-ns/my-topic1"; - Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topic) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); Map<String, byte[]> expected = new HashMap<>(); @@ -168,7 +175,10 @@ public void testCompactAddCompact() throws Exception { public void testCompactedInOrder() throws Exception { String topic = "persistent://my-property/use/my-ns/my-topic1"; - Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create(); + Producer<byte[]> producer = pulsarClient.newProducer().topic(topic) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); producer.send(MessageBuilder.create() .setKey("c") diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java index 03d82a785..a9723a00b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java @@ -115,7 +115,7 @@ * Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker. * <p> * When the queue is full, by default, all calls to {@link Producer#send} and {@link Producer#sendAsync} will fail - * unless blockIfQueueFull is set to true. Use {@link #setBlockIfQueueFull} to change the blocking behavior. + * unless blockIfQueueFull is set to true. Use {@link #blockIfQueueFull(boolean)} to change the blocking behavior. * * @param maxPendingMessages * @return @@ -148,13 +148,15 @@ /** * Set the message routing mode for the partitioned producer. * - * Default routing mode for messages to partition. + * Default routing mode is round-robin routing. * * This logic is applied when the application is not setting a key {@link MessageBuilder#setKey(String)} on a * particular message. * * @param messageRoutingMode * the message routing mode + * @return producer builder + * @see MessageRoutingMode */ ProducerBuilder<T> messageRoutingMode(MessageRoutingMode messageRoutingMode); @@ -205,9 +207,13 @@ * messages will be compressed at the batch level, leading to a much better compression ratio for similar headers or * contents. * - * When enabled default batch delay is set to 10 ms and default batch size is 1000 messages + * When enabled default batch delay is set to 1 ms and default batch size is 1000 messages * + * <p>Batching is enabled by default since 2.0.0. + * + * @return producer builder. * @see #batchingMaxPublishDelay(long, TimeUnit) + * @see #batchingMaxMessages(int) */ ProducerBuilder<T> enableBatching(boolean enableBatching); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java index 49cd1d8f4..1af077be9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java @@ -184,10 +184,11 @@ public ProducerConfiguration setBlockIfQueueFull(boolean blockIfQueueFull) { } /** - * Set the message routing mode for the partitioned producer + * Set the message routing mode for the partitioned producer. * - * @param mode - * @return + * @param messageRouteMode message routing mode. + * @return producer configuration + * @see MessageRoutingMode */ public ProducerConfiguration setMessageRoutingMode(MessageRoutingMode messageRouteMode) { checkNotNull(messageRouteMode); @@ -197,9 +198,10 @@ public ProducerConfiguration setMessageRoutingMode(MessageRoutingMode messageRou } /** - * Get the message routing mode for the partitioned producer + * Get the message routing mode for the partitioned producer. * - * @return + * @return message routing mode, default is round-robin routing. + * @see MessageRoutingMode#RoundRobinPartition */ public MessageRoutingMode getMessageRoutingMode() { return MessageRoutingMode.valueOf(conf.getMessageRoutingMode().toString()); @@ -269,9 +271,12 @@ public MessageRouter getMessageRouter() { } /** - * @ return if batch messages are enabled + * Return the flag whether automatic message batching is enabled or not. + * + * @return true if batch messages are enabled. otherwise false. + * @since 2.0.0 <br> + * It is enabled by default. */ - public boolean getBatchingEnabled() { return conf.isBatchingEnabled(); } @@ -290,8 +295,8 @@ public boolean getBatchingEnabled() { * @since 1.0.36 <br> * Make sure all the consumer applications have been updated to use this client version, before starting to * batch messages. + * */ - public ProducerConfiguration setBatchingEnabled(boolean batchMessagesEnabled) { conf.setBatchingEnabled(batchMessagesEnabled); return this; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java index 1449a454f..9f068091b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java @@ -50,7 +50,7 @@ private boolean blockIfQueueFull = false; private int maxPendingMessages = 1000; private int maxPendingMessagesAcrossPartitions = 50000; - private MessageRoutingMode messageRoutingMode = MessageRoutingMode.SinglePartition; + private MessageRoutingMode messageRoutingMode = MessageRoutingMode.RoundRobinPartition; private HashingScheme hashingScheme = HashingScheme.JavaStringHash; private ProducerCryptoFailureAction cryptoFailureAction = ProducerCryptoFailureAction.FAIL; @@ -58,9 +58,9 @@ @JsonIgnore private MessageRouter customMessageRouter = null; - private long batchingMaxPublishDelayMicros = TimeUnit.MILLISECONDS.toMicros(10); + private long batchingMaxPublishDelayMicros = TimeUnit.MILLISECONDS.toMicros(1); private int batchingMaxMessages = 1000; - private boolean batchingEnabled = false; // disabled by default + private boolean batchingEnabled = true; // enabled by default @JsonIgnore private CryptoKeyReader cryptoKeyReader; diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java index 68b789a57..c6f10613e 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java @@ -85,8 +85,11 @@ public void testProducer() throws Exception { public void testProducerConsumer() throws Exception { PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort()) .build(); - Producer<byte[]> producer = client.newProducer().topic("persistent://sample/test/local/producer-consumer-topic") - .create(); + Producer<byte[]> producer = client.newProducer() + .topic("persistent://sample/test/local/producer-consumer-topic") + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); // Create a consumer directly attached to broker Consumer<byte[]> consumer = pulsarClient.newConsumer() @@ -116,8 +119,10 @@ public void testPartitions() throws Exception { .build(); admin.persistentTopics().createPartitionedTopic("persistent://sample/test/local/partitioned-topic", 2); - Producer<byte[]> producer = client.newProducer().topic("persistent://sample/test/local/partitioned-topic") - .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); + Producer<byte[]> producer = client.newProducer() + .topic("persistent://sample/test/local/partitioned-topic") + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); // Create a consumer directly attached to broker Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://sample/test/local/partitioned-topic") diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java index c2c3ee940..5ed8adfcb 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java @@ -251,6 +251,9 @@ private void updateSentMsgStats(long msgSize, long latencyUsec) { private ProducerConfiguration getProducerConfiguration() { ProducerConfiguration conf = new ProducerConfiguration(); + conf.setBatchingEnabled(false); + conf.setMessageRoutingMode(MessageRoutingMode.SinglePartition); + // Set to false to prevent the server thread from being blocked if a lot of messages are pending. conf.setBlockIfQueueFull(false); diff --git a/tests/integration/compaction/src/test/java/org/apache/pulsar/tests/integration/TestCompaction.java b/tests/integration/compaction/src/test/java/org/apache/pulsar/tests/integration/TestCompaction.java index 6a6130f29..108f71d80 100644 --- a/tests/integration/compaction/src/test/java/org/apache/pulsar/tests/integration/TestCompaction.java +++ b/tests/integration/compaction/src/test/java/org/apache/pulsar/tests/integration/TestCompaction.java @@ -25,6 +25,7 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageBuilder; +import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.tests.DockerUtils; @@ -72,7 +73,11 @@ public void testPublishCompactAndConsumeCLI() throws Exception { try (PulsarClient client = PulsarClient.create(serviceUrl)) { client.newConsumer().topic(topic).subscriptionName("sub1").subscribe().close(); - try(Producer<byte[]> producer = client.newProducer().topic(topic).create()) { + try(Producer<byte[]> producer = client.newProducer() + .topic(topic) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create()) { producer.send(MessageBuilder.create().setKey("key0").setContent("content0".getBytes()).build()); producer.send(MessageBuilder.create().setKey("key0").setContent("content1".getBytes()).build()); } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services