This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new bee124e Add semantic integration tests for non-persistent topics and
effectively-once producing (#2059)
bee124e is described below
commit bee124e96c9695660a453e75983c3f53fc57bc87
Author: Sijie Guo <[email protected]>
AuthorDate: Sat Jun 30 22:58:08 2018 -0700
Add semantic integration tests for non-persistent topics and
effectively-once producing (#2059)
---
.../pulsar/tests/topologies/PulsarCluster.java | 33 ++++++
.../tests/topologies/PulsarClusterTestBase.java | 59 ++++++++++
.../tests/integration/semantics/SemanticsTest.java | 126 ++++++++++++++++++++-
3 files changed, 212 insertions(+), 6 deletions(-)
diff --git
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
index a30509d..9e2860f 100644
---
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
+++
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
@@ -18,10 +18,12 @@
*/
package org.apache.pulsar.tests.topologies;
+import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.tests.containers.PulsarContainer.CS_PORT;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -34,6 +36,7 @@ import org.apache.pulsar.tests.containers.CSContainer;
import org.apache.pulsar.tests.containers.ProxyContainer;
import org.apache.pulsar.tests.containers.PulsarContainer;
import org.apache.pulsar.tests.containers.ZKContainer;
+import org.testcontainers.containers.Container.ExecResult;
import org.testcontainers.containers.Network;
/**
@@ -42,6 +45,8 @@ import org.testcontainers.containers.Network;
@Slf4j
public class PulsarCluster {
+ protected static final String ADMIN_SCRIPT = "/pulsar/bin/pulsar-admin";
+
/**
* Pulsar Cluster Spec.
*
@@ -169,4 +174,32 @@ public class PulsarCluster {
log.info("Failed to shutdown network for pulsar cluster {}",
clusterName, e);
}
}
+
+ public BrokerContainer getAnyBroker() {
+ List<BrokerContainer> brokerList = Lists.newArrayList();
+ brokerList.addAll(brokerContainers.values());
+ Collections.shuffle(brokerList);
+ checkArgument(!brokerList.isEmpty(), "No broker is alive");
+ return brokerList.get(0);
+ }
+
+ public ExecResult runAdminCommandOnAnyBroker(String...commands) throws
Exception {
+ BrokerContainer container = getAnyBroker();
+ String[] cmds = new String[commands.length + 1];
+ cmds[0] = ADMIN_SCRIPT;
+ System.arraycopy(commands, 0, cmds, 1, commands.length);
+ return container.execCmd(cmds);
+ }
+
+ public ExecResult createNamespace(String nsName) throws Exception {
+ return runAdminCommandOnAnyBroker(
+ "namespaces", "create", "public/" + nsName,
+ "--clusters", clusterName);
+ }
+
+ public ExecResult enableDeduplication(String nsName, boolean enabled)
throws Exception {
+ return runAdminCommandOnAnyBroker(
+ "namespaces", "set-deduplication", "public/" + nsName,
+ enabled ? "--enable" : "--disable");
+ }
}
diff --git
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
index bee31b8..8cd49b1 100644
---
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
+++
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
@@ -22,10 +22,37 @@ import java.util.concurrent.ThreadLocalRandom;
import lombok.extern.slf4j.Slf4j;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
@Slf4j
public class PulsarClusterTestBase {
+ @DataProvider(name = "ServiceUrlAndTopics")
+ public static Object[][] serviceUrlAndTopics() {
+ return new Object[][] {
+ // plain text, persistent topic
+ {
+ pulsarCluster.getPlainTextServiceUrl(),
+ true,
+ },
+ // plain text, non-persistent topic
+ {
+ pulsarCluster.getPlainTextServiceUrl(),
+ false
+ }
+ };
+ }
+
+ @DataProvider(name = "ServiceUrls")
+ public static Object[][] serviceUrls() {
+ return new Object[][] {
+ // plain text
+ {
+ pulsarCluster.getPlainTextServiceUrl()
+ }
+ };
+ }
+
protected static PulsarCluster pulsarCluster;
@BeforeClass
@@ -58,4 +85,36 @@ public class PulsarClusterTestBase {
}
}
+ protected static String randomName(int numChars) {
+ StringBuilder sb = new StringBuilder();;;;
+ for (int i = 0; i < 8; i++) {
+ sb.append((char) (ThreadLocalRandom.current().nextInt(26) + 'a'));
+ }
+ return sb.toString();
+ }
+
+ protected static String generateNamespaceName() {
+ return "ns-" + randomName(8);
+ }
+
+ protected static String generateTopicName(String topicPrefix, boolean
isPersistent) {
+ return generateTopicName("default", topicPrefix, isPersistent);
+ }
+
+ protected static String generateTopicName(String namespace, String
topicPrefix, boolean isPersistent) {
+ String topicName = new StringBuilder(topicPrefix)
+ .append("-")
+ .append(randomName(8))
+ .append("-")
+ .append(System.currentTimeMillis())
+ .toString();
+ if (isPersistent) {
+ return "persistent://public/" + namespace + "/" + topicName;
+ } else {
+ return "non-persistent://public/" + namespace + "/" + topicName;
+ }
+ }
+
+
+
}
diff --git
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java
index a87f85a..1ba6533 100644
---
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java
+++
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java
@@ -20,26 +20,31 @@ package org.apache.pulsar.tests.integration.semantics;
import static org.testng.Assert.assertEquals;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.tests.topologies.PulsarClusterTestBase;
import org.testng.annotations.Test;
/**
* Test pulsar produce/consume semantics
*/
+@Slf4j
public class SemanticsTest extends PulsarClusterTestBase {
- @Test
- public void testPublishAndConsumePlainTextServiceUrl() throws Exception {
- testPublishAndConsume(
- pulsarCluster.getPlainTextServiceUrl(),
"test-publish-consume-plain-text");
- }
+ //
+ // Test Basic Publish & Consume Operations
+ //
- private void testPublishAndConsume(String serviceUrl, String topicName)
throws Exception {
+ @Test(dataProvider = "ServiceUrlAndTopics")
+ public void testPublishAndConsume(String serviceUrl, boolean isPersistent)
throws Exception {
+ String topicName = generateTopicName("testpubconsume", isPersistent);
int numMessages = 10;
@@ -69,5 +74,114 @@ public class SemanticsTest extends PulsarClusterTestBase {
}
}
+ @Test(dataProvider = "ServiceUrls")
+ public void testEffectivelyOnceDisabled(String serviceUrl) throws
Exception {
+ String nsName = generateNamespaceName();
+ pulsarCluster.createNamespace(nsName);
+
+ String topicName = generateTopicName(nsName, "testeffectivelyonce",
true);
+
+ @Cleanup
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(serviceUrl)
+ .build();
+
+ @Cleanup
+ Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName("test-sub")
+ .ackTimeout(10, TimeUnit.SECONDS)
+ .subscriptionType(SubscriptionType.Exclusive)
+ .subscribe();
+
+ @Cleanup
+ Producer<String> producer = client.newProducer(Schema.STRING)
+ .topic(topicName)
+ .enableBatching(false)
+ .producerName("effectively-once-producer")
+ .initialSequenceId(1L)
+ .create();
+
+ // send messages
+ sendMessagesIdempotency(producer);
+
+ // checkout the result
+ checkMessagesIdempotencyDisabled(consumer);
+ }
+
+ private static void sendMessagesIdempotency(Producer<String> producer)
throws Exception {
+ // sending message
+ producer.newMessage()
+ .sequenceId(1L)
+ .value("message-1")
+ .send();
+
+ // sending a duplicated message
+ producer.newMessage()
+ .sequenceId(1L)
+ .value("duplicated-message-1")
+ .send();
+ // sending a second message
+ producer.newMessage()
+ .sequenceId(2L)
+ .value("message-2")
+ .send();
+ }
+
+ private static void checkMessagesIdempotencyDisabled(Consumer<String>
consumer) throws Exception {
+ receiveAndAssertMessage(consumer, 1L, "message-1");
+ receiveAndAssertMessage(consumer, 1L, "duplicated-message-1");
+ receiveAndAssertMessage(consumer, 2L, "message-2");
+ }
+
+ private static void receiveAndAssertMessage(Consumer<String> consumer,
+ long expectedSequenceId,
+ String expectedContent) throws
Exception {
+ Message<String> msg = consumer.receive();
+ log.info("Received message {}", msg);
+ assertEquals(expectedSequenceId, msg.getSequenceId());
+ assertEquals(expectedContent, msg.getValue());
+ }
+
+ @Test(dataProvider = "ServiceUrls")
+ public void testEffectivelyOnceEnabled(String serviceUrl) throws Exception
{
+ String nsName = generateNamespaceName();
+ pulsarCluster.createNamespace(nsName);
+ pulsarCluster.enableDeduplication(nsName, true);
+
+ String topicName = generateTopicName(nsName, "testeffectivelyonce",
true);
+
+ @Cleanup
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(serviceUrl)
+ .build();
+
+ @Cleanup
+ Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName("test-sub")
+ .ackTimeout(10, TimeUnit.SECONDS)
+ .subscriptionType(SubscriptionType.Exclusive)
+ .subscribe();
+
+ @Cleanup
+ Producer<String> producer = client.newProducer(Schema.STRING)
+ .topic(topicName)
+ .enableBatching(false)
+ .producerName("effectively-once-producer")
+ .initialSequenceId(1L)
+ .create();
+
+ // send messages
+ sendMessagesIdempotency(producer);
+
+ // checkout the result
+ checkMessagesIdempotencyEnabled(consumer);
+ }
+
+ private static void checkMessagesIdempotencyEnabled(Consumer<String>
consumer) throws Exception {
+ receiveAndAssertMessage(consumer, 1L, "message-1");
+ receiveAndAssertMessage(consumer, 2L, "message-2");
+ }
}