This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new bee124e  Add semantic integration tests for non-persistent topics and 
effectively-once producing (#2059)
bee124e is described below

commit bee124e96c9695660a453e75983c3f53fc57bc87
Author: Sijie Guo <[email protected]>
AuthorDate: Sat Jun 30 22:58:08 2018 -0700

    Add semantic integration tests for non-persistent topics and 
effectively-once producing (#2059)
---
 .../pulsar/tests/topologies/PulsarCluster.java     |  33 ++++++
 .../tests/topologies/PulsarClusterTestBase.java    |  59 ++++++++++
 .../tests/integration/semantics/SemanticsTest.java | 126 ++++++++++++++++++++-
 3 files changed, 212 insertions(+), 6 deletions(-)

diff --git 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
index a30509d..9e2860f 100644
--- 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
+++ 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarCluster.java
@@ -18,10 +18,12 @@
  */
 package org.apache.pulsar.tests.topologies;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.pulsar.tests.containers.PulsarContainer.CS_PORT;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -34,6 +36,7 @@ import org.apache.pulsar.tests.containers.CSContainer;
 import org.apache.pulsar.tests.containers.ProxyContainer;
 import org.apache.pulsar.tests.containers.PulsarContainer;
 import org.apache.pulsar.tests.containers.ZKContainer;
+import org.testcontainers.containers.Container.ExecResult;
 import org.testcontainers.containers.Network;
 
 /**
@@ -42,6 +45,8 @@ import org.testcontainers.containers.Network;
 @Slf4j
 public class PulsarCluster {
 
+    protected static final String ADMIN_SCRIPT = "/pulsar/bin/pulsar-admin";
+
     /**
      * Pulsar Cluster Spec.
      *
@@ -169,4 +174,32 @@ public class PulsarCluster {
             log.info("Failed to shutdown network for pulsar cluster {}", 
clusterName, e);
         }
     }
+
+    public BrokerContainer getAnyBroker() {
+        List<BrokerContainer> brokerList = Lists.newArrayList();
+        brokerList.addAll(brokerContainers.values());
+        Collections.shuffle(brokerList);
+        checkArgument(!brokerList.isEmpty(), "No broker is alive");
+        return brokerList.get(0);
+    }
+
+    public ExecResult runAdminCommandOnAnyBroker(String...commands) throws 
Exception {
+        BrokerContainer container = getAnyBroker();
+        String[] cmds = new String[commands.length + 1];
+        cmds[0] = ADMIN_SCRIPT;
+        System.arraycopy(commands, 0, cmds, 1, commands.length);
+        return container.execCmd(cmds);
+    }
+
+    public ExecResult createNamespace(String nsName) throws Exception {
+        return runAdminCommandOnAnyBroker(
+            "namespaces", "create", "public/" + nsName,
+            "--clusters", clusterName);
+    }
+
+    public ExecResult enableDeduplication(String nsName, boolean enabled) 
throws Exception {
+        return runAdminCommandOnAnyBroker(
+            "namespaces", "set-deduplication", "public/" + nsName,
+            enabled ? "--enable" : "--disable");
+    }
 }
diff --git 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
index bee31b8..8cd49b1 100644
--- 
a/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
+++ 
b/tests/integration-tests-topologies/src/main/java/org/apache/pulsar/tests/topologies/PulsarClusterTestBase.java
@@ -22,10 +22,37 @@ import java.util.concurrent.ThreadLocalRandom;
 import lombok.extern.slf4j.Slf4j;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
 
 @Slf4j
 public class PulsarClusterTestBase {
 
+    @DataProvider(name = "ServiceUrlAndTopics")
+    public static Object[][] serviceUrlAndTopics() {
+        return new Object[][] {
+            // plain text, persistent topic
+            {
+                pulsarCluster.getPlainTextServiceUrl(),
+                true,
+            },
+            // plain text, non-persistent topic
+            {
+                pulsarCluster.getPlainTextServiceUrl(),
+                false
+            }
+        };
+    }
+
+    @DataProvider(name = "ServiceUrls")
+    public static Object[][] serviceUrls() {
+        return new Object[][] {
+            // plain text
+            {
+                pulsarCluster.getPlainTextServiceUrl()
+            }
+        };
+    }
+
     protected static PulsarCluster pulsarCluster;
 
     @BeforeClass
@@ -58,4 +85,36 @@ public class PulsarClusterTestBase {
         }
     }
 
+    protected static String randomName(int numChars) {
+        StringBuilder sb = new StringBuilder();;;;
+        for (int i = 0; i < 8; i++) {
+            sb.append((char) (ThreadLocalRandom.current().nextInt(26) + 'a'));
+        }
+        return sb.toString();
+    }
+
+    protected static String generateNamespaceName() {
+        return "ns-" + randomName(8);
+    }
+
+    protected static String generateTopicName(String topicPrefix, boolean 
isPersistent) {
+        return generateTopicName("default", topicPrefix, isPersistent);
+    }
+
+    protected static String generateTopicName(String namespace, String 
topicPrefix, boolean isPersistent) {
+        String topicName = new StringBuilder(topicPrefix)
+            .append("-")
+            .append(randomName(8))
+            .append("-")
+            .append(System.currentTimeMillis())
+            .toString();
+        if (isPersistent) {
+            return "persistent://public/" + namespace + "/" + topicName;
+        } else {
+            return "non-persistent://public/" + namespace + "/" + topicName;
+        }
+    }
+
+
+
 }
diff --git 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java
 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java
index a87f85a..1ba6533 100644
--- 
a/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java
+++ 
b/tests/integration/semantics/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java
@@ -20,26 +20,31 @@ package org.apache.pulsar.tests.integration.semantics;
 
 import static org.testng.Assert.assertEquals;
 
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.tests.topologies.PulsarClusterTestBase;
 import org.testng.annotations.Test;
 
 /**
  * Test pulsar produce/consume semantics
  */
+@Slf4j
 public class SemanticsTest extends PulsarClusterTestBase {
 
-    @Test
-    public void testPublishAndConsumePlainTextServiceUrl() throws Exception {
-        testPublishAndConsume(
-            pulsarCluster.getPlainTextServiceUrl(), 
"test-publish-consume-plain-text");
-    }
+    //
+    // Test Basic Publish & Consume Operations
+    //
 
-    private void testPublishAndConsume(String serviceUrl, String topicName) 
throws Exception {
+    @Test(dataProvider = "ServiceUrlAndTopics")
+    public void testPublishAndConsume(String serviceUrl, boolean isPersistent) 
throws Exception {
+        String topicName = generateTopicName("testpubconsume", isPersistent);
 
         int numMessages = 10;
 
@@ -69,5 +74,114 @@ public class SemanticsTest extends PulsarClusterTestBase {
         }
     }
 
+    @Test(dataProvider = "ServiceUrls")
+    public void testEffectivelyOnceDisabled(String serviceUrl) throws 
Exception {
+        String nsName = generateNamespaceName();
+        pulsarCluster.createNamespace(nsName);
+
+        String topicName = generateTopicName(nsName, "testeffectivelyonce", 
true);
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+            .serviceUrl(serviceUrl)
+            .build();
+
+        @Cleanup
+        Consumer<String> consumer = client.newConsumer(Schema.STRING)
+            .topic(topicName)
+            .subscriptionName("test-sub")
+            .ackTimeout(10, TimeUnit.SECONDS)
+            .subscriptionType(SubscriptionType.Exclusive)
+            .subscribe();
+
+        @Cleanup
+        Producer<String> producer = client.newProducer(Schema.STRING)
+            .topic(topicName)
+            .enableBatching(false)
+            .producerName("effectively-once-producer")
+            .initialSequenceId(1L)
+            .create();
+
+        // send messages
+        sendMessagesIdempotency(producer);
+
+        // checkout the result
+        checkMessagesIdempotencyDisabled(consumer);
+    }
+
+    private static void sendMessagesIdempotency(Producer<String> producer) 
throws Exception {
+        // sending message
+        producer.newMessage()
+            .sequenceId(1L)
+            .value("message-1")
+            .send();
+
+        // sending a duplicated message
+        producer.newMessage()
+            .sequenceId(1L)
+            .value("duplicated-message-1")
+            .send();
 
+        // sending a second message
+        producer.newMessage()
+            .sequenceId(2L)
+            .value("message-2")
+            .send();
+    }
+
+    private static void checkMessagesIdempotencyDisabled(Consumer<String> 
consumer) throws Exception {
+        receiveAndAssertMessage(consumer, 1L, "message-1");
+        receiveAndAssertMessage(consumer, 1L, "duplicated-message-1");
+        receiveAndAssertMessage(consumer, 2L, "message-2");
+    }
+
+    private static void receiveAndAssertMessage(Consumer<String> consumer,
+                                                long expectedSequenceId,
+                                                String expectedContent) throws 
Exception {
+        Message<String> msg = consumer.receive();
+        log.info("Received message {}", msg);
+        assertEquals(expectedSequenceId, msg.getSequenceId());
+        assertEquals(expectedContent, msg.getValue());
+    }
+
+    @Test(dataProvider = "ServiceUrls")
+    public void testEffectivelyOnceEnabled(String serviceUrl) throws Exception 
{
+        String nsName = generateNamespaceName();
+        pulsarCluster.createNamespace(nsName);
+        pulsarCluster.enableDeduplication(nsName, true);
+
+        String topicName = generateTopicName(nsName, "testeffectivelyonce", 
true);
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+            .serviceUrl(serviceUrl)
+            .build();
+
+        @Cleanup
+        Consumer<String> consumer = client.newConsumer(Schema.STRING)
+            .topic(topicName)
+            .subscriptionName("test-sub")
+            .ackTimeout(10, TimeUnit.SECONDS)
+            .subscriptionType(SubscriptionType.Exclusive)
+            .subscribe();
+
+        @Cleanup
+        Producer<String> producer = client.newProducer(Schema.STRING)
+            .topic(topicName)
+            .enableBatching(false)
+            .producerName("effectively-once-producer")
+            .initialSequenceId(1L)
+            .create();
+
+        // send messages
+        sendMessagesIdempotency(producer);
+
+        // checkout the result
+        checkMessagesIdempotencyEnabled(consumer);
+    }
+
+    private static void checkMessagesIdempotencyEnabled(Consumer<String> 
consumer) throws Exception {
+        receiveAndAssertMessage(consumer, 1L, "message-1");
+        receiveAndAssertMessage(consumer, 2L, "message-2");
+    }
 }

Reply via email to