This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c710c187d266f0968ad48601636c653912ad5b49 Author: hangc0276 <[email protected]> AuthorDate: Fri Jul 2 21:34:10 2021 +0800 fix compaction entry read exception (#11175) (cherry picked from commit c716495a4282093bd81f9cd43f909811213bfb32) --- .../pulsar/compaction/CompactedTopicImpl.java | 1 + .../pulsar/compaction/CompactedTopicTest.java | 57 +++++++++++++++++++++- 2 files changed, 57 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java index e0755a7..12748e8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java @@ -109,6 +109,7 @@ public class CompactedTopicImpl implements CompactedTopic { if (startPoint == NEWER_THAN_COMPACTED) { cursor.seek(compactionHorizon.getNext()); callback.readEntriesComplete(Collections.emptyList(), ctx); + return CompletableFuture.completedFuture(null); } return readEntries(context.ledger, startPoint, endPoint) .thenAccept((entries) -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java index 2d3411b..410d1e5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java @@ -46,14 +46,16 @@ import org.apache.commons.lang3.tuple.Triple; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.client.admin.LongRunningProcessStatus; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.RawMessage; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.RawMessageImpl; import org.apache.pulsar.common.api.proto.MessageIdData; import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.ClusterDataImpl; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.util.FutureUtil; import org.testng.Assert; @@ -305,4 +307,57 @@ public class CompactedTopicTest extends MockedPulsarServiceBaseTest { producer.close(); } + + @Test(timeOut = 30000) + public void testReadMessageFromCompactedLedger() throws Exception { + final String key = "1"; + String msg = "test compaction msg"; + final String topic = "persistent://my-property/use/my-ns/testCompactWithEmptyMessage-" + UUID.randomUUID(); + admin.topics().createPartitionedTopic(topic, 1); + final int numMessages = 10; + + Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).enableBatching(false).create(); + for (int i = 0; i < numMessages; ++i) { + producer.newMessage().key(key).value(msg).send(); + } + + admin.topics().triggerCompaction(topic); + boolean succeed = retryStrategically((test) -> { + try { + return LongRunningProcessStatus.Status.SUCCESS.equals(admin.topics().compactionStatus(topic).status); + } catch (PulsarAdminException e) { + return false; + } + }, 10, 200); + + Assert.assertTrue(succeed); + + + final String newKey = "2"; + String newMsg = "test compaction msg v2"; + for (int i = 0; i < numMessages; ++i) { + producer.newMessage().key(newKey).value(newMsg).send(); + } + + Reader<String> reader = pulsarClient.newReader(Schema.STRING) + .topic(topic) + .subscriptionName("test") + .readCompacted(true) + .startMessageId(MessageId.earliest) + .create(); + + int compactedMsgCount = 0; + int nonCompactedMsgCount = 0; + while (reader.hasMessageAvailable()) { + Message<String> message = reader.readNext(); + if (key.equals(message.getKey()) && msg.equals(message.getValue())) { + compactedMsgCount++; + } else if (newKey.equals(message.getKey()) && newMsg.equals(message.getValue())) { + nonCompactedMsgCount++; + } + } + + Assert.assertEquals(compactedMsgCount, 1); + Assert.assertEquals(nonCompactedMsgCount, numMessages); + } }
