This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new faa941632ce [fix] [tx] [branch-2.11] Transaction buffer recover
blocked by readNext (#18969)
faa941632ce is described below
commit faa941632ce785f86f00fc8fb6026297e14a1951
Author: fengyubiao <[email protected]>
AuthorDate: Mon Dec 19 12:02:50 2022 +0800
[fix] [tx] [branch-2.11] Transaction buffer recover blocked by readNext
(#18969)
### Motivation
Since PR #18833 can not cherry-pick to `branch-2.11`, create a separate PR.
#### Context for Transaction Buffer
- If turn on `transactionCoordinatorEnabled`, then `TransactionBuffer`
will be initialized when a user topic create.
- The `TransactionBuffer` reads the aborted logs of transactions from topic
`__transaction_buffer_snapshot` -- this process is called `recovery`.
- During recovery, the reading from that snapshot ledger is done via a
`Reader`; the reader works like this:
```
while (reader.hasMessageAvailable()){
reader.readNext();
}
```
#### Context for Compaction
- After
[pip-14](https://github.com/apache/pulsar/wiki/PIP-14:-Topic-compaction), the
consumer that enabled feature read-compacted will read messages from the
compacted topic instead of the original topic if the task-compaction is done,
and read messages from the original topic if task-compaction is not done.
- If the data of the last message with key k sent to a topic is null, the
compactor will mark all messages for that key as deleted.
#### Issue
There is a race condition: after executing `reader.hasMessageAvailable`,
the following messages have been deleted by compaction-task, so read next will
be blocked because there have no messages to read.
----
### Modifications
- If hits this issue, do recover again.
----
#### Why not just let the client try to load the topic again to retry the
recover?
If the topic load is failed, the client will receive an error response.
This is a behavior that we can handle, so should not be perceived by the users.
---
.../buffer/impl/TopicTransactionBuffer.java | 24 +++++--
.../TopicTransactionBufferRecoverTest.java | 81 ++++++++++++++++++++++
2 files changed, 101 insertions(+), 4 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 48aad16340c..95c07f1e08d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -29,6 +29,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@@ -52,6 +53,7 @@ import
org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSna
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TransactionBufferStats;
@@ -59,6 +61,7 @@ import
org.apache.pulsar.common.policies.data.TransactionInBufferStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.common.util.Codec;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.RecoverTimeRecord;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.SpscArrayQueue;
@@ -615,6 +618,11 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
this.takeSnapshotWriter = takeSnapshotWriter;
}
+ private long getSystemClientOperationTimeoutMs() throws Exception {
+ PulsarClientImpl pulsarClient = (PulsarClientImpl)
topic.getBrokerService().getPulsar().getClient();
+ return pulsarClient.getConfiguration().getOperationTimeoutMs();
+ }
+
@SneakyThrows
@Override
public void run() {
@@ -629,7 +637,8 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
try {
boolean hasSnapshot = false;
while (reader.hasMoreEvents()) {
- Message<TransactionBufferSnapshot> message
= reader.readNext();
+ Message<TransactionBufferSnapshot> message
= reader.readNextAsync()
+
.get(getSystemClientOperationTimeoutMs(), TimeUnit.MILLISECONDS);
if
(topic.getName().equals(message.getKey())) {
TransactionBufferSnapshot
transactionBufferSnapshot = message.getValue();
if (transactionBufferSnapshot != null)
{
@@ -642,18 +651,25 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
}
}
if (!hasSnapshot) {
- closeReader(reader);
callBack.noNeedToRecover();
return;
}
+ } catch (TimeoutException ex) {
+ Throwable t =
FutureUtil.unwrapCompletionException(ex);
+ String errorMessage = String.format("[%s]
Transaction buffer recover fail by read "
+ + "transactionBufferSnapshot
timeout!", topic.getName());
+ log.error(errorMessage, t);
+ callBack.recoverExceptionally(
+ new
BrokerServiceException.ServiceUnitNotReadyException(errorMessage, t));
+ return;
} catch (Exception ex) {
log.error("[{}] Transaction buffer recover
fail when read "
+ "transactionBufferSnapshot!",
topic.getName(), ex);
callBack.recoverExceptionally(ex);
- closeReader(reader);
return;
+ } finally {
+ closeReader(reader);
}
- closeReader(reader);
ManagedCursor managedCursor;
try {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
index c0afdbee487..2ff3bf4acf2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
@@ -20,9 +20,12 @@ package org.apache.pulsar.broker.transaction;
import static
org.apache.pulsar.common.naming.SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
@@ -33,9 +36,11 @@ import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.NavigableMap;
import java.util.Optional;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
@@ -88,6 +93,7 @@ public class TopicTransactionBufferRecoverTest extends
TransactionTestBase {
private static final int NUM_PARTITIONS = 16;
@BeforeMethod
protected void setup() throws Exception {
+ conf.getProperties().setProperty("brokerClient_operationTimeoutMs",
Integer.valueOf(10 * 1000).toString());
setUpBase(1, NUM_PARTITIONS, RECOVER_COMMIT, 0);
admin.topics().createNonPartitionedTopic(RECOVER_ABORT);
admin.topics().createNonPartitionedTopic(TAKE_SNAPSHOT);
@@ -224,6 +230,81 @@ public class TopicTransactionBufferRecoverTest extends
TransactionTestBase {
}
+ private void makeTBSnapshotReaderTimeoutIfFirstRead(TopicName topicName)
throws Exception {
+ SystemTopicClient.Reader mockReader =
mock(SystemTopicClient.Reader.class);
+ AtomicBoolean isFirstCallOfMethodHasMoreEvents = new AtomicBoolean();
+ AtomicBoolean isFirstCallOfMethodHasReadNext = new AtomicBoolean();
+ AtomicBoolean isFirstCallOfMethodHasReadNextAsync = new
AtomicBoolean();
+
+ doAnswer(invocation -> {
+ if (isFirstCallOfMethodHasMoreEvents.compareAndSet(false,true)){
+ return true;
+ } else {
+ return false;
+ }
+ }).when(mockReader).hasMoreEvents();
+
+ doAnswer(invocation -> {
+ if (isFirstCallOfMethodHasReadNext.compareAndSet(false, true)){
+ // Just stuck the thread.
+ Thread.sleep(3600 * 1000);
+ }
+ return null;
+ }).when(mockReader).readNext();
+
+ doAnswer(invocation -> {
+ CompletableFuture<Message> future = new CompletableFuture<>();
+ new Thread(() -> {
+ if (isFirstCallOfMethodHasReadNextAsync.compareAndSet(false,
true)){
+ // Just stuck the thread.
+ try {
+ Thread.sleep(3600 * 1000);
+ } catch (InterruptedException e) {
+ }
+ future.complete(null);
+ } else {
+ future.complete(null);
+ }
+ }).start();
+ return future;
+ }).when(mockReader).readNextAsync();
+
+
when(mockReader.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
+
+ for (PulsarService pulsarService : pulsarServiceList){
+ // Init prop: lastMessageIdInBroker.
+ final TransactionBufferSnapshotService tbSnapshotService =
+ pulsarService.getTransactionBufferSnapshotService();
+ TransactionBufferSnapshotService spyTbSnapshotService =
spy(tbSnapshotService);
+ doAnswer(invocation ->
CompletableFuture.completedFuture(mockReader))
+ .when(spyTbSnapshotService).createReader(topicName);
+ Field field =
+
PulsarService.class.getDeclaredField("transactionBufferSnapshotService");
+ field.setAccessible(true);
+ field.set(pulsarService, spyTbSnapshotService);
+ }
+ }
+
+ @Test(timeOut = 60 * 1000)
+ public void testTBRecoverCanRetryIfTimeoutRead() throws Exception {
+ String topicName = String.format("persistent://%s/%s", NAMESPACE1,
+ "tx_recover_" + UUID.randomUUID().toString().replaceAll("-",
"_"));
+
+ // Make race condition of "getLastMessageId" and "compaction" to make
recover can't complete.
+ makeTBSnapshotReaderTimeoutIfFirstRead(TopicName.get(topicName));
+ // Verify( Cmd-PRODUCER will wait for TB recover finished )
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topicName)
+ .sendTimeout(0, TimeUnit.SECONDS)
+ .enableBatching(false)
+ .batchingMaxMessages(2)
+ .create();
+
+ // cleanup.
+ producer.close();
+ admin.topics().delete(topicName, false);
+ }
+
@Test
private void testTakeSnapshot() throws IOException, ExecutionException,
InterruptedException {