This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7680173 [compaction] make topic compaction works with partitioned
topic (#2367)
7680173 is described below
commit 76801731f27e0289eef80959d90cd9165aaa99ed
Author: Sijie Guo <[email protected]>
AuthorDate: Tue Aug 14 11:02:24 2018 -0700
[compaction] make topic compaction works with partitioned topic (#2367)
* [compaction] make topic compaction works with partitioned topic
### Motivation
Topic compaction doesn't work with partitioned topic.
### Changes
- make `RawReaderImpl` and `ReaderImpl` return message with partition idx
- make broker service `Consumer` deliver MessageIdData with partition idx
- add an integration test to ensure compaction work with partitioned topic
---
.../org/apache/pulsar/broker/service/Consumer.java | 9 +-
.../apache/pulsar/client/impl/RawReaderImpl.java | 16 ++-
.../pulsar/compaction/TwoPhaseCompactor.java | 8 +-
.../PersistentDispatcherFailoverConsumerTest.java | 2 +-
.../org/apache/pulsar/client/impl/ReaderImpl.java | 4 +-
.../integration/compaction/TestCompaction.java | 150 +++++++++++++++++++++
6 files changed, 177 insertions(+), 12 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index c3befa5..4c54018 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -73,6 +73,7 @@ public class Consumer {
private final String appId;
private AuthenticationDataSource authenticationData;
private final String topicName;
+ private final int partitionIdx;
private final InitialPosition subscriptionInitialPosition;
private final long consumerId;
@@ -119,6 +120,7 @@ public class Consumer {
this.subscription = subscription;
this.subType = subType;
this.topicName = topicName;
+ this.partitionIdx = TopicName.getPartitionIndex(topicName);
this.consumerId = consumerId;
this.priorityLevel = priorityLevel;
this.readCompacted = readCompacted;
@@ -239,8 +241,11 @@ public class Consumer {
Entry entry = entries.get(i);
PositionImpl pos = (PositionImpl) entry.getPosition();
MessageIdData.Builder messageIdBuilder =
MessageIdData.newBuilder();
- MessageIdData messageId =
messageIdBuilder.setLedgerId(pos.getLedgerId()).setEntryId(pos.getEntryId())
- .build();
+ MessageIdData messageId = messageIdBuilder
+ .setLedgerId(pos.getLedgerId())
+ .setEntryId(pos.getEntryId())
+ .setPartition(partitionIdx)
+ .build();
ByteBuf metadataAndPayload = entry.getDataBuffer();
// increment ref-count of data and release at the end of
process: so, we can get chance to call entry.release
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index e768c3e..ae1a4db 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -35,6 +35,7 @@ import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -102,8 +103,15 @@ public class RawReaderImpl implements RawReader {
RawConsumerImpl(PulsarClientImpl client,
ConsumerConfigurationData<byte[]> conf,
CompletableFuture<Consumer<byte[]>> consumerFuture) {
- super(client, conf.getSingleTopic(), conf,
client.externalExecutorProvider().getExecutor(), -1,
- consumerFuture, SubscriptionMode.Durable,
MessageId.earliest, Schema.BYTES);
+ super(client,
+ conf.getSingleTopic(),
+ conf,
+ client.externalExecutorProvider().getExecutor(),
+ TopicName.getPartitionIndex(conf.getSingleTopic()),
+ consumerFuture,
+ SubscriptionMode.Durable,
+ MessageId.earliest,
+ Schema.BYTES);
incomingRawMessages = new GrowableArrayBlockingQueue<>();
pendingRawReceives = new ConcurrentLinkedQueue<>();
}
@@ -172,8 +180,8 @@ public class RawReaderImpl implements RawReader {
@Override
void messageReceived(MessageIdData messageId, ByteBuf
headersAndPayload, ClientCnx cnx) {
if (log.isDebugEnabled()) {
- log.debug("[{}][{}] Received raw message: {}/{}", topic,
subscription,
- messageId.getLedgerId(), messageId.getEntryId());
+ log.debug("[{}][{}] Received raw message: {}/{}/{}", topic,
subscription,
+ messageId.getEntryId(), messageId.getLedgerId(),
messageId.getPartition());
}
incomingRawMessages.add(
new RawMessageAndCnx(new RawMessageImpl(messageId,
headersAndPayload), cnx));
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index b4ee68a..cc3f710 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -146,11 +146,11 @@ public class TwoPhaseCompactor extends Compactor {
private void scheduleTimeout(CompletableFuture<RawMessage> future) {
Future<?> timeout = scheduler.schedule(() -> {
- future.completeExceptionally(new TimeoutException("Timeout"));
- }, 10, TimeUnit.SECONDS);
+ future.completeExceptionally(new TimeoutException("Timeout"));
+ }, 10, TimeUnit.SECONDS);
future.whenComplete((res, exception) -> {
- timeout.cancel(true);
- });
+ timeout.cancel(true);
+ });
}
private CompletableFuture<Long> phaseTwo(RawReader reader, MessageId from,
MessageId to,
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index bc8d4bf..c1be8d1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -579,7 +579,7 @@ public class PersistentDispatcherFailoverConsumerTest {
private Consumer createConsumer(int priority, int permit, boolean blocked,
int id) throws Exception {
Consumer consumer =
- new Consumer(null, SubType.Shared, null, id, priority, ""+id,
5000,
+ new Consumer(null, SubType.Shared, "test-topic", id, priority,
""+id, 5000,
serverCnx, "appId", Collections.emptyMap(), false /*
read compacted */, InitialPosition.Latest);
try {
consumer.flowPermits(permit);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index 00f8af0..aafd125 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -36,6 +36,7 @@ import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+import org.apache.pulsar.common.naming.TopicName;
public class ReaderImpl<T> implements Reader<T> {
@@ -82,8 +83,9 @@ public class ReaderImpl<T> implements Reader<T> {
consumerConfiguration.setCryptoKeyReader(readerConfiguration.getCryptoKeyReader());
}
+ final int partitionIdx =
TopicName.getPartitionIndex(readerConfiguration.getTopicName());
consumer = new ConsumerImpl<>(client,
readerConfiguration.getTopicName(), consumerConfiguration, listenerExecutor,
- -1, consumerFuture, SubscriptionMode.NonDurable,
readerConfiguration.getStartMessageId(), schema);
+ partitionIdx, consumerFuture, SubscriptionMode.NonDurable,
readerConfiguration.getStartMessageId(), schema);
}
@Override
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compaction/TestCompaction.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compaction/TestCompaction.java
index fb76202..bf19ddd 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compaction/TestCompaction.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/compaction/TestCompaction.java
@@ -18,20 +18,31 @@
*/
package org.apache.pulsar.tests.integration.compaction;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static org.testng.Assert.assertEquals;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TopicMetadata;
import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.testng.annotations.Test;
+import org.testng.collections.Maps;
/**
* Test cases for compaction.
*/
+@Slf4j
public class TestCompaction extends PulsarTestSuite {
@Test(dataProvider = "ServiceUrls")
@@ -141,6 +152,134 @@ public class TestCompaction extends PulsarTestSuite {
}
}
+ @Test(dataProvider = "ServiceUrls")
+ public void testPublishCompactAndConsumePartitionedTopics(String
serviceUrl) throws Exception {
+
+ final String tenant = "compaction-test-partitioned-topic-" +
randomName(4);
+ final String namespace = tenant + "/ns1";
+ final String topic = "persistent://" + namespace +
"/partitioned-topic";
+ final int numKeys = 10;
+ final int numValuesPerKey = 10;
+ final String subscriptionName = "sub1";
+
+ this.createTenantName(tenant, pulsarCluster.getClusterName(), "admin");
+
+ this.createNamespace(namespace);
+
+ pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
+ "set-clusters", "--clusters", pulsarCluster.getClusterName(),
namespace);
+
+ this.createPartitionedTopic(topic, 2);
+
+ try (PulsarClient client =
PulsarClient.builder().serviceUrl(serviceUrl).build()) {
+ // force creating individual partitions
+ client.newConsumer().topic(topic +
"-partition-0").subscriptionName(subscriptionName).subscribe().close();
+ client.newConsumer().topic(topic +
"-partition-1").subscriptionName(subscriptionName).subscribe().close();
+
+ try(Producer<byte[]> producer = client.newProducer()
+ .topic(topic)
+ .messageRouter(new MessageRouter() {
+ @Override
+ public int choosePartition(Message<?> msg, TopicMetadata
metadata) {
+ return Integer.parseInt(msg.getKey()) %
metadata.numPartitions();
+ }
+ })
+ .create()
+ ) {
+ for (int i = 0; i < numKeys; i++) {
+ for (int j = 0; j < numValuesPerKey; j++) {
+ producer.newMessage()
+ .key("" + i)
+ .value(("key-" + i + "-value-" +
j).getBytes(UTF_8))
+ .send();
+ }
+ log.info("Successfully write {} values for key {}",
numValuesPerKey, i);
+ }
+ }
+
+ // test even partition
+ consumePartition(
+ client,
+ topic + "-partition-0",
+ subscriptionName,
+ IntStream.range(0, numKeys).filter(i -> i % 2 ==
0).boxed().collect(Collectors.toList()),
+ numValuesPerKey,
+ 0);
+ // test odd partition
+ consumePartition(
+ client,
+ topic + "-partition-1",
+ subscriptionName,
+ IntStream.range(0, numKeys).filter(i -> i % 2 !=
0).boxed().collect(Collectors.toList()),
+ numValuesPerKey,
+ 0);
+
+
+ pulsarCluster.runAdminCommandOnAnyBroker("topics",
+ "compact", topic + "-partition-0");
+ pulsarCluster.runAdminCommandOnAnyBroker("topics",
+ "compact", topic + "-partition-1");
+
+ // wait for compaction to be completed. we don't need to sleep
here, but sleep will reduce
+ // the times of polling compaction-status from brokers
+ Thread.sleep(30000);
+
+ pulsarCluster.runAdminCommandOnAnyBroker("topics",
+ "compaction-status", "-w", topic + "-partition-0");
+ pulsarCluster.runAdminCommandOnAnyBroker("topics",
+ "compaction-status", "-w", topic + "-partition-1");
+
+ Map<Integer, String> compactedData = consumeCompactedTopic(client,
topic, subscriptionName, numKeys);
+ assertEquals(compactedData.size(), numKeys);
+ for (int i = 0; i < numKeys; i++) {
+ assertEquals("key-" + i + "-value-" + (numValuesPerKey - 1),
compactedData.get(i));
+ }
+ }
+ }
+
+ private static void consumePartition(PulsarClient client,
+ String topic,
+ String subscription,
+ List<Integer> keys,
+ int numValuesPerKey,
+ int startValue) throws
PulsarClientException {
+ try (Consumer<byte[]> consumer = client.newConsumer()
+ .readCompacted(true)
+ .topic(topic)
+ .subscriptionName(subscription)
+ .subscribe()
+ ) {
+ for (Integer key : keys) {
+ for (int i = 0; i < numValuesPerKey; i++) {
+ Message<byte[]> m = consumer.receive();
+ assertEquals("" + key, m.getKey());
+ assertEquals("key-" + key + "-value-" + (startValue + i),
new String(m.getValue(), UTF_8));
+ }
+ log.info("Read {} values from key {}", numValuesPerKey, key);
+ }
+
+ }
+ }
+
+ private static Map<Integer, String> consumeCompactedTopic(PulsarClient
client,
+ String topic,
+ String
subscription,
+ int numKeys)
throws PulsarClientException {
+ Map<Integer, String> keys = Maps.newHashMap();
+ try (Consumer<byte[]> consumer = client.newConsumer()
+ .readCompacted(true)
+ .topic(topic)
+ .subscriptionName(subscription)
+ .subscribe()
+ ) {
+ for (int i = 0; i < numKeys; i++) {
+ Message<byte[]> m = consumer.receive();
+ keys.put(Integer.parseInt(m.getKey()), new
String(m.getValue(), UTF_8));
+ }
+ }
+ return keys;
+ }
+
private static void waitAndVerifyCompacted(PulsarClient client, String
topic,
String sub, String expectedKey,
String expectedValue) throws Exception {
for (int i = 0; i < 60; i++) {
@@ -222,5 +361,16 @@ public class TestCompaction extends PulsarTestSuite {
return result;
}
+ private ContainerExecResult createPartitionedTopic(final String
partitionedTopicName, int numPartitions)
+ throws Exception {
+ ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker(
+ "topics",
+ "create-partitioned-topic",
+ "--partitions", "" + numPartitions,
+ partitionedTopicName);
+ assertEquals(0, result.getExitCode());
+ return result;
+ }
+
}