sijie closed pull request #2367: [compaction] make topic compaction works with 
partitioned topic
URL: https://github.com/apache/incubator-pulsar/pull/2367
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index c3befa50df..4c54018fb1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -73,6 +73,7 @@
     private final String appId;
     private AuthenticationDataSource authenticationData;
     private final String topicName;
+    private final int partitionIdx;
     private final InitialPosition subscriptionInitialPosition;
 
     private final long consumerId;
@@ -119,6 +120,7 @@ public Consumer(Subscription subscription, SubType subType, 
String topicName, lo
         this.subscription = subscription;
         this.subType = subType;
         this.topicName = topicName;
+        this.partitionIdx = TopicName.getPartitionIndex(topicName);
         this.consumerId = consumerId;
         this.priorityLevel = priorityLevel;
         this.readCompacted = readCompacted;
@@ -239,8 +241,11 @@ public SendMessageInfo sendMessages(final List<Entry> 
entries, SendListener list
                 Entry entry = entries.get(i);
                 PositionImpl pos = (PositionImpl) entry.getPosition();
                 MessageIdData.Builder messageIdBuilder = 
MessageIdData.newBuilder();
-                MessageIdData messageId = 
messageIdBuilder.setLedgerId(pos.getLedgerId()).setEntryId(pos.getEntryId())
-                        .build();
+                MessageIdData messageId = messageIdBuilder
+                    .setLedgerId(pos.getLedgerId())
+                    .setEntryId(pos.getEntryId())
+                    .setPartition(partitionIdx)
+                    .build();
 
                 ByteBuf metadataAndPayload = entry.getDataBuffer();
                 // increment ref-count of data and release at the end of 
process: so, we can get chance to call entry.release
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index e768c3e463..ae1a4dba17 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -35,6 +35,7 @@
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -102,8 +103,15 @@ public String toString() {
 
         RawConsumerImpl(PulsarClientImpl client, 
ConsumerConfigurationData<byte[]> conf,
                 CompletableFuture<Consumer<byte[]>> consumerFuture) {
-            super(client, conf.getSingleTopic(), conf, 
client.externalExecutorProvider().getExecutor(), -1,
-                    consumerFuture, SubscriptionMode.Durable, 
MessageId.earliest, Schema.BYTES);
+            super(client,
+                conf.getSingleTopic(),
+                conf,
+                client.externalExecutorProvider().getExecutor(),
+                TopicName.getPartitionIndex(conf.getSingleTopic()),
+                consumerFuture,
+                SubscriptionMode.Durable,
+                MessageId.earliest,
+                Schema.BYTES);
             incomingRawMessages = new GrowableArrayBlockingQueue<>();
             pendingRawReceives = new ConcurrentLinkedQueue<>();
         }
@@ -172,8 +180,8 @@ private void reset() {
         @Override
         void messageReceived(MessageIdData messageId, ByteBuf 
headersAndPayload, ClientCnx cnx) {
             if (log.isDebugEnabled()) {
-                log.debug("[{}][{}] Received raw message: {}/{}", topic, 
subscription,
-                          messageId.getLedgerId(), messageId.getEntryId());
+                log.debug("[{}][{}] Received raw message: {}/{}/{}", topic, 
subscription,
+                          messageId.getEntryId(), messageId.getLedgerId(), 
messageId.getPartition());
             }
             incomingRawMessages.add(
                     new RawMessageAndCnx(new RawMessageImpl(messageId, 
headersAndPayload), cnx));
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index b4ee68a3a3..cc3f710249 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -146,11 +146,11 @@ private void phaseOneLoop(RawReader reader,
 
     private void scheduleTimeout(CompletableFuture<RawMessage> future) {
         Future<?> timeout = scheduler.schedule(() -> {
-                future.completeExceptionally(new TimeoutException("Timeout"));
-            }, 10, TimeUnit.SECONDS);
+            future.completeExceptionally(new TimeoutException("Timeout"));
+        }, 10, TimeUnit.SECONDS);
         future.whenComplete((res, exception) -> {
-                timeout.cancel(true);
-            });
+            timeout.cancel(true);
+        });
     }
 
     private CompletableFuture<Long> phaseTwo(RawReader reader, MessageId from, 
MessageId to,
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index bc8d4bfcdb..c1be8d1b2f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -579,7 +579,7 @@ private Consumer 
getNextConsumer(PersistentDispatcherMultipleConsumers dispatche
 
     private Consumer createConsumer(int priority, int permit, boolean blocked, 
int id) throws Exception {
         Consumer consumer =
-                new Consumer(null, SubType.Shared, null, id, priority, ""+id, 
5000,
+                new Consumer(null, SubType.Shared, "test-topic", id, priority, 
""+id, 5000,
                         serverCnx, "appId", Collections.emptyMap(), false /* 
read compacted */, InitialPosition.Latest);
         try {
             consumer.flowPermits(permit);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index 00f8af0b48..aafd12549f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -36,6 +36,7 @@
 import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+import org.apache.pulsar.common.naming.TopicName;
 
 public class ReaderImpl<T> implements Reader<T> {
 
@@ -82,8 +83,9 @@ public void reachedEndOfTopic(Consumer<T> consumer) {
             
consumerConfiguration.setCryptoKeyReader(readerConfiguration.getCryptoKeyReader());
         }
 
+        final int partitionIdx = 
TopicName.getPartitionIndex(readerConfiguration.getTopicName());
         consumer = new ConsumerImpl<>(client, 
readerConfiguration.getTopicName(), consumerConfiguration, listenerExecutor,
-                -1, consumerFuture, SubscriptionMode.NonDurable, 
readerConfiguration.getStartMessageId(), schema);
+                partitionIdx, consumerFuture, SubscriptionMode.NonDurable, 
readerConfiguration.getStartMessageId(), schema);
     }
 
     @Override
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compaction/TestCompaction.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compaction/TestCompaction.java
index fb76202e95..bf19dddaee 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compaction/TestCompaction.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compaction/TestCompaction.java
@@ -18,20 +18,31 @@
  */
 package org.apache.pulsar.tests.integration.compaction;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.testng.Assert.assertEquals;
 
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageRouter;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TopicMetadata;
 import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
 import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.testng.annotations.Test;
+import org.testng.collections.Maps;
 
 /**
  * Test cases for compaction.
  */
+@Slf4j
 public class TestCompaction extends PulsarTestSuite {
 
     @Test(dataProvider = "ServiceUrls")
@@ -141,6 +152,134 @@ public void testPublishCompactAndConsumeRest(String 
serviceUrl) throws Exception
         }
     }
 
+    @Test(dataProvider = "ServiceUrls")
+    public void testPublishCompactAndConsumePartitionedTopics(String 
serviceUrl) throws Exception {
+
+        final String tenant = "compaction-test-partitioned-topic-" + 
randomName(4);
+        final String namespace = tenant + "/ns1";
+        final String topic = "persistent://" + namespace + 
"/partitioned-topic";
+        final int numKeys = 10;
+        final int numValuesPerKey = 10;
+        final String subscriptionName = "sub1";
+
+        this.createTenantName(tenant, pulsarCluster.getClusterName(), "admin");
+
+        this.createNamespace(namespace);
+
+        pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
+                "set-clusters", "--clusters", pulsarCluster.getClusterName(), 
namespace);
+
+        this.createPartitionedTopic(topic, 2);
+
+        try (PulsarClient client = 
PulsarClient.builder().serviceUrl(serviceUrl).build()) {
+            // force creating individual partitions
+            client.newConsumer().topic(topic + 
"-partition-0").subscriptionName(subscriptionName).subscribe().close();
+            client.newConsumer().topic(topic + 
"-partition-1").subscriptionName(subscriptionName).subscribe().close();
+
+            try(Producer<byte[]> producer = client.newProducer()
+                .topic(topic)
+                .messageRouter(new MessageRouter() {
+                    @Override
+                    public int choosePartition(Message<?> msg, TopicMetadata 
metadata) {
+                        return Integer.parseInt(msg.getKey()) % 
metadata.numPartitions();
+                    }
+                })
+                .create()
+            ) {
+                for (int i = 0; i < numKeys; i++) {
+                    for (int j = 0; j < numValuesPerKey; j++) {
+                        producer.newMessage()
+                            .key("" + i)
+                            .value(("key-" + i + "-value-" + 
j).getBytes(UTF_8))
+                            .send();
+                    }
+                    log.info("Successfully write {} values for key {}", 
numValuesPerKey, i);
+                }
+            }
+
+            // test even partition
+            consumePartition(
+                client,
+                topic + "-partition-0",
+                subscriptionName,
+                IntStream.range(0, numKeys).filter(i -> i % 2 == 
0).boxed().collect(Collectors.toList()),
+                numValuesPerKey,
+                0);
+            // test odd partition
+            consumePartition(
+                client,
+                topic + "-partition-1",
+                subscriptionName,
+                IntStream.range(0, numKeys).filter(i -> i % 2 != 
0).boxed().collect(Collectors.toList()),
+                numValuesPerKey,
+                0);
+
+
+            pulsarCluster.runAdminCommandOnAnyBroker("topics",
+                "compact", topic + "-partition-0");
+            pulsarCluster.runAdminCommandOnAnyBroker("topics",
+                "compact", topic + "-partition-1");
+
+            // wait for compaction to be completed. we don't need to sleep 
here, but sleep will reduce
+            // the times of polling compaction-status from brokers
+            Thread.sleep(30000);
+
+            pulsarCluster.runAdminCommandOnAnyBroker("topics",
+                "compaction-status", "-w", topic + "-partition-0");
+            pulsarCluster.runAdminCommandOnAnyBroker("topics",
+                "compaction-status", "-w", topic + "-partition-1");
+
+            Map<Integer, String> compactedData = consumeCompactedTopic(client, 
topic, subscriptionName, numKeys);
+            assertEquals(compactedData.size(), numKeys);
+            for (int i = 0; i < numKeys; i++) {
+                assertEquals("key-" + i + "-value-" + (numValuesPerKey - 1), 
compactedData.get(i));
+            }
+        }
+    }
+
+    private static void consumePartition(PulsarClient client,
+                                         String topic,
+                                         String subscription,
+                                         List<Integer> keys,
+                                         int numValuesPerKey,
+                                         int startValue) throws 
PulsarClientException {
+        try (Consumer<byte[]> consumer = client.newConsumer()
+             .readCompacted(true)
+             .topic(topic)
+             .subscriptionName(subscription)
+             .subscribe()
+        ) {
+            for (Integer key : keys) {
+                for (int i = 0; i < numValuesPerKey; i++) {
+                    Message<byte[]> m = consumer.receive();
+                    assertEquals("" + key, m.getKey());
+                    assertEquals("key-" + key + "-value-" + (startValue + i), 
new String(m.getValue(), UTF_8));
+                }
+                log.info("Read {} values from key {}", numValuesPerKey, key);
+            }
+
+        }
+    }
+
+    private static Map<Integer, String> consumeCompactedTopic(PulsarClient 
client,
+                                                              String topic,
+                                                              String 
subscription,
+                                                              int numKeys) 
throws PulsarClientException {
+        Map<Integer, String> keys = Maps.newHashMap();
+        try (Consumer<byte[]> consumer = client.newConsumer()
+             .readCompacted(true)
+             .topic(topic)
+             .subscriptionName(subscription)
+             .subscribe()
+        ) {
+            for (int i = 0; i < numKeys; i++) {
+                Message<byte[]> m = consumer.receive();
+                keys.put(Integer.parseInt(m.getKey()), new 
String(m.getValue(), UTF_8));
+            }
+        }
+        return keys;
+    }
+
     private static void waitAndVerifyCompacted(PulsarClient client, String 
topic,
                                                String sub, String expectedKey, 
String expectedValue) throws Exception {
         for (int i = 0; i < 60; i++) {
@@ -222,5 +361,16 @@ private ContainerExecResult createNamespace(final String 
Ns) throws Exception {
         return result;
     }
 
+    private ContainerExecResult createPartitionedTopic(final String 
partitionedTopicName, int numPartitions)
+            throws Exception {
+        ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker(
+            "topics",
+            "create-partitioned-topic",
+            "--partitions", "" + numPartitions,
+            partitionedTopicName);
+        assertEquals(0, result.getExitCode());
+        return result;
+    }
+
 
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to