This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 98e2c66 ### Motivation (#12698)
98e2c66 is described below
commit 98e2c66a7b2d5fd42641527cd9ad6c3b497d65c7
Author: lipenghui <[email protected]>
AuthorDate: Wed Nov 10 15:58:51 2021 +0800
### Motivation (#12698)
Fix lost compaction data due to compaction properties missed during
reset-cursor.
1. The compaction reader will seek to the earliest position to read data
from the topic, but the compaction properties missed during the cursor reset,
this will lead to the inited compaction subscribe without compaction horizon,
so the compaction reader will skip the last compacted data. It will only happen
when init the compaction subscription, so can introduced by the loadbalance or
topic unloading manually.
2. Advance the cursor should also keep the properties, otherwise, the
properties will lost during the cursor trimming.
### Changes
1. Keep the properties for resetting the cursor while the cursor is for
data compaction.
2. Copy the properties to the new mark delete entry while advance the
cursor, this is triggered byt the managed ledger internal, so it's not only for
compacted topic, the internal task should not loss the properties when trimming
the cursor.
### Tests
New tests added to make sure the compaction will not loss data during topic
unloading and the reader can read all the compacted data after the compaction
task complete
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 10 +++-
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 2 +-
.../broker/intercept/CounterBrokerInterceptor.java | 8 +--
.../pulsar/compaction/CompactedTopicTest.java | 70 +++++++++++++++++++++-
4 files changed, 82 insertions(+), 8 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index dd715ff..f13fa93 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -194,6 +194,7 @@ public class ManagedCursorImpl implements ManagedCursor {
private long entriesReadCount;
private long entriesReadSize;
private int individualDeletedMessagesSerializedSize;
+ private static final String COMPACTION_CURSOR_NAME = "__compaction";
class MarkDeleteEntry {
final PositionImpl newPosition;
@@ -1068,7 +1069,8 @@ public class ManagedCursorImpl implements ManagedCursor {
Range.closedOpen(markDeletePosition,
newMarkDeletePosition)));
}
markDeletePosition = newMarkDeletePosition;
- lastMarkDeleteEntry = new
MarkDeleteEntry(newMarkDeletePosition, Collections.emptyMap(),
+ lastMarkDeleteEntry = new
MarkDeleteEntry(newMarkDeletePosition, isCompactionCursor() ?
+ getProperties() : Collections.emptyMap(),
null, null);
individualDeletedMessages.clear();
if (config.isDeletionAtBatchIndexLevelEnabled() &&
batchDeletedIndexes != null) {
@@ -1118,7 +1120,7 @@ public class ManagedCursorImpl implements ManagedCursor {
};
- internalAsyncMarkDelete(newPosition, Collections.emptyMap(), new
MarkDeleteCallback() {
+ internalAsyncMarkDelete(newPosition, isCompactionCursor() ?
getProperties() : Collections.emptyMap(), new MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
finalCallback.operationComplete();
@@ -3066,5 +3068,9 @@ public class ManagedCursorImpl implements ManagedCursor {
return isReadPositionOnTail || isReadPositionChanged;
}
+ private boolean isCompactionCursor() {
+ return COMPACTION_CURSOR_NAME.equals(name);
+ }
+
private static final Logger log =
LoggerFactory.getLogger(ManagedCursorImpl.class);
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index d4d0aa3..629d10d 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2551,7 +2551,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
if (highestPositionToDelete.compareTo((PositionImpl)
cursor.getMarkDeletedPosition()) > 0
&& highestPositionToDelete.compareTo((PositionImpl)
cursor.getManagedLedger().getLastConfirmedEntry()) <= 0
&& !(!cursor.isDurable() && cursor instanceof
NonDurableCursorImpl && ((NonDurableCursorImpl) cursor).isReadCompacted())) {
- cursor.asyncMarkDelete(highestPositionToDelete, new
MarkDeleteCallback() {
+ cursor.asyncMarkDelete(highestPositionToDelete,
cursor.getProperties(), new MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
index dc51c3d..1462cfa 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
@@ -56,14 +56,14 @@ public class CounterBrokerInterceptor implements
BrokerInterceptor {
Entry entry,
long[] ackSet,
MessageMetadata msgMetadata) {
- log.info("Send message to topic {}, subscription {}",
+ log.debug("Send message to topic {}, subscription {}",
subscription.getTopic(), subscription.getName());
beforeSendCount++;
}
@Override
public void onPulsarCommand(BaseCommand command, ServerCnx cnx) {
- log.info("[{}] On [{}] Pulsar command", count,
command.getType().name());
+ log.debug("[{}] On [{}] Pulsar command", count,
command.getType().name());
count ++;
}
@@ -75,13 +75,13 @@ public class CounterBrokerInterceptor implements
BrokerInterceptor {
@Override
public void onWebserviceRequest(ServletRequest request) {
count ++;
- log.info("[{}] On [{}] Webservice request", count,
((HttpServletRequest)request).getRequestURL().toString());
+ log.debug("[{}] On [{}] Webservice request", count,
((HttpServletRequest)request).getRequestURL().toString());
}
@Override
public void onWebserviceResponse(ServletRequest request, ServletResponse
response) {
count ++;
- log.info("[{}] On [{}] Webservice response {}", count,
((HttpServletRequest)request).getRequestURL().toString(), response);
+ log.debug("[{}] On [{}] Webservice response {}", count,
((HttpServletRequest)request).getRequestURL().toString(), response);
if (response instanceof Response) {
Response res = (Response) response;
responseList.add(new
ResponseEvent(res.getHttpChannel().getRequest().getRequestURI(),
res.getStatus()));
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
index cbe7372..69d66d9 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -33,6 +33,7 @@ import java.util.Optional;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
@@ -577,9 +578,76 @@ public class CompactedTopicTest extends
MockedPulsarServiceBaseTest {
// The reader should read all 600 keys
int received = 0;
while (reader.hasMessageAvailable()) {
- System.out.println(reader.readNext().getKey());
+ reader.readNext();
received++;
}
Assert.assertEquals(received, keys * 3);
+ reader.close();
+ producer.close();
+ }
+
+ @Test(timeOut = 120000)
+ public void testCompactionWithTopicUnloading() throws Exception {
+ String topic =
"persistent://my-property/use/my-ns/testCompactionWithTopicUnloading-" +
+ UUID.randomUUID();
+ final int numMessages = 2000;
+ final int keys = 500;
+ final String msg = "Test";
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .blockIfQueueFull(true)
+ .maxPendingMessages(numMessages)
+ .enableBatching(false)
+ .create();
+ CompletableFuture<MessageId> lastMessage = null;
+ for (int i = 0; i < numMessages; ++i) {
+ lastMessage = producer.newMessage().key(i % keys +
"").value(msg).sendAsync();
+ }
+ producer.flush();
+ lastMessage.join();
+ admin.topics().triggerCompaction(topic);
+ Awaitility.await().pollInterval(5, TimeUnit.SECONDS).untilAsserted(()
-> {
+ PersistentTopicInternalStats stats =
admin.topics().getInternalStats(topic);
+ Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1);
+ Assert.assertEquals(stats.compactedLedger.entries, keys);
+ Assert.assertEquals(admin.topics().getStats(topic)
+
.getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(), 0);
+ });
+
+ admin.topics().unload(topic);
+ for (int i = 0; i < numMessages; ++i) {
+ lastMessage = producer.newMessage().key((i % keys + keys) +
"").value(msg).sendAsync();
+ }
+ producer.flush();
+ lastMessage.join();
+ admin.topics().triggerCompaction(topic);
+ Thread.sleep(100);
+ admin.topics().unload(topic);
+ admin.topics().triggerCompaction(topic);
+ Awaitility.await().pollInterval(3, TimeUnit.SECONDS).atMost(30,
TimeUnit.SECONDS).untilAsserted(() -> {
+ PersistentTopicInternalStats stats =
admin.topics().getInternalStats(topic);
+ Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1);
+ Assert.assertEquals(stats.compactedLedger.entries, keys * 2);
+ Assert.assertEquals(admin.topics().getStats(topic)
+
.getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(), 0);
+ });
+
+ // Start a new reader to reading messages
+ Reader<String> reader = pulsarClient.newReader(Schema.STRING)
+ .topic(topic)
+ .startMessageId(MessageId.earliest)
+ .readCompacted(true)
+ .receiverQueueSize(10)
+ .create();
+
+ // The reader should read all 600 keys
+ int received = 0;
+ while (reader.hasMessageAvailable()) {
+ reader.readNext();
+ received++;
+ }
+ Assert.assertEquals(received, keys * 2);
+ reader.close();
+ producer.close();
}
}