This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7680173  [compaction] make topic compaction works with partitioned 
topic (#2367)
7680173 is described below

commit 76801731f27e0289eef80959d90cd9165aaa99ed
Author: Sijie Guo <[email protected]>
AuthorDate: Tue Aug 14 11:02:24 2018 -0700

    [compaction] make topic compaction works with partitioned topic (#2367)
    
    * [compaction] make topic compaction works with partitioned topic
    
     ### Motivation
    
    Topic compaction doesn't work with partitioned topic.
    
     ### Changes
    
    - make `RawReaderImpl` and `ReaderImpl` return message with partition idx
    - make broker service `Consumer` deliver MessageIdData with partition idx
    - add an integration test to ensure compaction work with partitioned topic
---
 .../org/apache/pulsar/broker/service/Consumer.java |   9 +-
 .../apache/pulsar/client/impl/RawReaderImpl.java   |  16 ++-
 .../pulsar/compaction/TwoPhaseCompactor.java       |   8 +-
 .../PersistentDispatcherFailoverConsumerTest.java  |   2 +-
 .../org/apache/pulsar/client/impl/ReaderImpl.java  |   4 +-
 .../integration/compaction/TestCompaction.java     | 150 +++++++++++++++++++++
 6 files changed, 177 insertions(+), 12 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index c3befa5..4c54018 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -73,6 +73,7 @@ public class Consumer {
     private final String appId;
     private AuthenticationDataSource authenticationData;
     private final String topicName;
+    private final int partitionIdx;
     private final InitialPosition subscriptionInitialPosition;
 
     private final long consumerId;
@@ -119,6 +120,7 @@ public class Consumer {
         this.subscription = subscription;
         this.subType = subType;
         this.topicName = topicName;
+        this.partitionIdx = TopicName.getPartitionIndex(topicName);
         this.consumerId = consumerId;
         this.priorityLevel = priorityLevel;
         this.readCompacted = readCompacted;
@@ -239,8 +241,11 @@ public class Consumer {
                 Entry entry = entries.get(i);
                 PositionImpl pos = (PositionImpl) entry.getPosition();
                 MessageIdData.Builder messageIdBuilder = 
MessageIdData.newBuilder();
-                MessageIdData messageId = 
messageIdBuilder.setLedgerId(pos.getLedgerId()).setEntryId(pos.getEntryId())
-                        .build();
+                MessageIdData messageId = messageIdBuilder
+                    .setLedgerId(pos.getLedgerId())
+                    .setEntryId(pos.getEntryId())
+                    .setPartition(partitionIdx)
+                    .build();
 
                 ByteBuf metadataAndPayload = entry.getDataBuffer();
                 // increment ref-count of data and release at the end of 
process: so, we can get chance to call entry.release
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index e768c3e..ae1a4db 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -35,6 +35,7 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -102,8 +103,15 @@ public class RawReaderImpl implements RawReader {
 
         RawConsumerImpl(PulsarClientImpl client, 
ConsumerConfigurationData<byte[]> conf,
                 CompletableFuture<Consumer<byte[]>> consumerFuture) {
-            super(client, conf.getSingleTopic(), conf, 
client.externalExecutorProvider().getExecutor(), -1,
-                    consumerFuture, SubscriptionMode.Durable, 
MessageId.earliest, Schema.BYTES);
+            super(client,
+                conf.getSingleTopic(),
+                conf,
+                client.externalExecutorProvider().getExecutor(),
+                TopicName.getPartitionIndex(conf.getSingleTopic()),
+                consumerFuture,
+                SubscriptionMode.Durable,
+                MessageId.earliest,
+                Schema.BYTES);
             incomingRawMessages = new GrowableArrayBlockingQueue<>();
             pendingRawReceives = new ConcurrentLinkedQueue<>();
         }
@@ -172,8 +180,8 @@ public class RawReaderImpl implements RawReader {
         @Override
         void messageReceived(MessageIdData messageId, ByteBuf 
headersAndPayload, ClientCnx cnx) {
             if (log.isDebugEnabled()) {
-                log.debug("[{}][{}] Received raw message: {}/{}", topic, 
subscription,
-                          messageId.getLedgerId(), messageId.getEntryId());
+                log.debug("[{}][{}] Received raw message: {}/{}/{}", topic, 
subscription,
+                          messageId.getEntryId(), messageId.getLedgerId(), 
messageId.getPartition());
             }
             incomingRawMessages.add(
                     new RawMessageAndCnx(new RawMessageImpl(messageId, 
headersAndPayload), cnx));
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index b4ee68a..cc3f710 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -146,11 +146,11 @@ public class TwoPhaseCompactor extends Compactor {
 
     private void scheduleTimeout(CompletableFuture<RawMessage> future) {
         Future<?> timeout = scheduler.schedule(() -> {
-                future.completeExceptionally(new TimeoutException("Timeout"));
-            }, 10, TimeUnit.SECONDS);
+            future.completeExceptionally(new TimeoutException("Timeout"));
+        }, 10, TimeUnit.SECONDS);
         future.whenComplete((res, exception) -> {
-                timeout.cancel(true);
-            });
+            timeout.cancel(true);
+        });
     }
 
     private CompletableFuture<Long> phaseTwo(RawReader reader, MessageId from, 
MessageId to,
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index bc8d4bf..c1be8d1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -579,7 +579,7 @@ public class PersistentDispatcherFailoverConsumerTest {
 
     private Consumer createConsumer(int priority, int permit, boolean blocked, 
int id) throws Exception {
         Consumer consumer =
-                new Consumer(null, SubType.Shared, null, id, priority, ""+id, 
5000,
+                new Consumer(null, SubType.Shared, "test-topic", id, priority, 
""+id, 5000,
                         serverCnx, "appId", Collections.emptyMap(), false /* 
read compacted */, InitialPosition.Latest);
         try {
             consumer.flowPermits(permit);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index 00f8af0..aafd125 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -36,6 +36,7 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+import org.apache.pulsar.common.naming.TopicName;
 
 public class ReaderImpl<T> implements Reader<T> {
 
@@ -82,8 +83,9 @@ public class ReaderImpl<T> implements Reader<T> {
             
consumerConfiguration.setCryptoKeyReader(readerConfiguration.getCryptoKeyReader());
         }
 
+        final int partitionIdx = 
TopicName.getPartitionIndex(readerConfiguration.getTopicName());
         consumer = new ConsumerImpl<>(client, 
readerConfiguration.getTopicName(), consumerConfiguration, listenerExecutor,
-                -1, consumerFuture, SubscriptionMode.NonDurable, 
readerConfiguration.getStartMessageId(), schema);
+                partitionIdx, consumerFuture, SubscriptionMode.NonDurable, 
readerConfiguration.getStartMessageId(), schema);
     }
 
     @Override
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compaction/TestCompaction.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compaction/TestCompaction.java
index fb76202..bf19ddd 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compaction/TestCompaction.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compaction/TestCompaction.java
@@ -18,20 +18,31 @@
  */
 package org.apache.pulsar.tests.integration.compaction;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.testng.Assert.assertEquals;
 
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageRouter;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TopicMetadata;
 import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
 import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.testng.annotations.Test;
+import org.testng.collections.Maps;
 
 /**
  * Test cases for compaction.
  */
+@Slf4j
 public class TestCompaction extends PulsarTestSuite {
 
     @Test(dataProvider = "ServiceUrls")
@@ -141,6 +152,134 @@ public class TestCompaction extends PulsarTestSuite {
         }
     }
 
+    @Test(dataProvider = "ServiceUrls")
+    public void testPublishCompactAndConsumePartitionedTopics(String 
serviceUrl) throws Exception {
+
+        final String tenant = "compaction-test-partitioned-topic-" + 
randomName(4);
+        final String namespace = tenant + "/ns1";
+        final String topic = "persistent://" + namespace + 
"/partitioned-topic";
+        final int numKeys = 10;
+        final int numValuesPerKey = 10;
+        final String subscriptionName = "sub1";
+
+        this.createTenantName(tenant, pulsarCluster.getClusterName(), "admin");
+
+        this.createNamespace(namespace);
+
+        pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
+                "set-clusters", "--clusters", pulsarCluster.getClusterName(), 
namespace);
+
+        this.createPartitionedTopic(topic, 2);
+
+        try (PulsarClient client = 
PulsarClient.builder().serviceUrl(serviceUrl).build()) {
+            // force creating individual partitions
+            client.newConsumer().topic(topic + 
"-partition-0").subscriptionName(subscriptionName).subscribe().close();
+            client.newConsumer().topic(topic + 
"-partition-1").subscriptionName(subscriptionName).subscribe().close();
+
+            try(Producer<byte[]> producer = client.newProducer()
+                .topic(topic)
+                .messageRouter(new MessageRouter() {
+                    @Override
+                    public int choosePartition(Message<?> msg, TopicMetadata 
metadata) {
+                        return Integer.parseInt(msg.getKey()) % 
metadata.numPartitions();
+                    }
+                })
+                .create()
+            ) {
+                for (int i = 0; i < numKeys; i++) {
+                    for (int j = 0; j < numValuesPerKey; j++) {
+                        producer.newMessage()
+                            .key("" + i)
+                            .value(("key-" + i + "-value-" + 
j).getBytes(UTF_8))
+                            .send();
+                    }
+                    log.info("Successfully write {} values for key {}", 
numValuesPerKey, i);
+                }
+            }
+
+            // test even partition
+            consumePartition(
+                client,
+                topic + "-partition-0",
+                subscriptionName,
+                IntStream.range(0, numKeys).filter(i -> i % 2 == 
0).boxed().collect(Collectors.toList()),
+                numValuesPerKey,
+                0);
+            // test odd partition
+            consumePartition(
+                client,
+                topic + "-partition-1",
+                subscriptionName,
+                IntStream.range(0, numKeys).filter(i -> i % 2 != 
0).boxed().collect(Collectors.toList()),
+                numValuesPerKey,
+                0);
+
+
+            pulsarCluster.runAdminCommandOnAnyBroker("topics",
+                "compact", topic + "-partition-0");
+            pulsarCluster.runAdminCommandOnAnyBroker("topics",
+                "compact", topic + "-partition-1");
+
+            // wait for compaction to be completed. we don't need to sleep 
here, but sleep will reduce
+            // the times of polling compaction-status from brokers
+            Thread.sleep(30000);
+
+            pulsarCluster.runAdminCommandOnAnyBroker("topics",
+                "compaction-status", "-w", topic + "-partition-0");
+            pulsarCluster.runAdminCommandOnAnyBroker("topics",
+                "compaction-status", "-w", topic + "-partition-1");
+
+            Map<Integer, String> compactedData = consumeCompactedTopic(client, 
topic, subscriptionName, numKeys);
+            assertEquals(compactedData.size(), numKeys);
+            for (int i = 0; i < numKeys; i++) {
+                assertEquals("key-" + i + "-value-" + (numValuesPerKey - 1), 
compactedData.get(i));
+            }
+        }
+    }
+
+    private static void consumePartition(PulsarClient client,
+                                         String topic,
+                                         String subscription,
+                                         List<Integer> keys,
+                                         int numValuesPerKey,
+                                         int startValue) throws 
PulsarClientException {
+        try (Consumer<byte[]> consumer = client.newConsumer()
+             .readCompacted(true)
+             .topic(topic)
+             .subscriptionName(subscription)
+             .subscribe()
+        ) {
+            for (Integer key : keys) {
+                for (int i = 0; i < numValuesPerKey; i++) {
+                    Message<byte[]> m = consumer.receive();
+                    assertEquals("" + key, m.getKey());
+                    assertEquals("key-" + key + "-value-" + (startValue + i), 
new String(m.getValue(), UTF_8));
+                }
+                log.info("Read {} values from key {}", numValuesPerKey, key);
+            }
+
+        }
+    }
+
+    private static Map<Integer, String> consumeCompactedTopic(PulsarClient 
client,
+                                                              String topic,
+                                                              String 
subscription,
+                                                              int numKeys) 
throws PulsarClientException {
+        Map<Integer, String> keys = Maps.newHashMap();
+        try (Consumer<byte[]> consumer = client.newConsumer()
+             .readCompacted(true)
+             .topic(topic)
+             .subscriptionName(subscription)
+             .subscribe()
+        ) {
+            for (int i = 0; i < numKeys; i++) {
+                Message<byte[]> m = consumer.receive();
+                keys.put(Integer.parseInt(m.getKey()), new 
String(m.getValue(), UTF_8));
+            }
+        }
+        return keys;
+    }
+
     private static void waitAndVerifyCompacted(PulsarClient client, String 
topic,
                                                String sub, String expectedKey, 
String expectedValue) throws Exception {
         for (int i = 0; i < 60; i++) {
@@ -222,5 +361,16 @@ public class TestCompaction extends PulsarTestSuite {
         return result;
     }
 
+    private ContainerExecResult createPartitionedTopic(final String 
partitionedTopicName, int numPartitions)
+            throws Exception {
+        ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker(
+            "topics",
+            "create-partitioned-topic",
+            "--partitions", "" + numPartitions,
+            partitionedTopicName);
+        assertEquals(0, result.getExitCode());
+        return result;
+    }
+
 
 }

Reply via email to