This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 5eefdf10e563c32552772a3d50127c55ff18d557 Author: Yunze Xu <[email protected]> AuthorDate: Wed Jun 22 23:34:49 2022 +0800 [fix][Java Client] Fix thread safety issue of `LastCumulativeAck` (#16072) ### Motivation There were several issues caused by the thread safe issue of `LastCumulativeAck`, see: - https://github.com/apache/pulsar/pull/10586 - https://github.com/apache/pulsar/pull/12343 The root cause is that `LastCumulativeAck` could be accessed by different threads, especially in `flushAsync` method. But the fields are accessed directly and no thread safety can be guaranteed. In addition, the current `LastCumulativeAck` class was added in https://github.com/apache/pulsar/pull/8996 to hold two object references, but this modification is wrong. Before #8996, there are two CAS operations in `doCumulativeAck` method in case it's called concurretly. Though the composite CAS operation is not atomic. However, after #8996, only CAS operation was performed but it's compared with a `LastCumulativeAck` object, not the two fields (`messageId` and `bitSetRecyclable`). There is another issue that it uses a flag `cumulativeAckFlushRequired` to mark if `lastCumulativeAck` should flush. However, if `flushAsync` was called concurrently, both would send ACK commands to broker. ### Modifications To solve the thread safety issue, this PR move the `LastCumulativeAck` out of the `PersistentAcknowledgmentsGroupingTracker` to disable directly access to the internal fields. Then, the following synchronized methods were added to guarantee the thread safety: - `update`: Guarantee the safe write operations. It also recycles the `BitSetRecyclable` object before assigning new values and indicates itself can be flushed. - `flush`: If it can be flushed, return a thread local `LastCumulativeAck` instance that contains the message ID and the bit set. The bit set is deep copied to avoid the original reference being recycled in another `update` call. In addition, since the `messageId` field is volatile, the `getMessageId` method can always retrieve the latest reference. `LastCumulativeAckTest` is added to verify the sematics above. Based on the new design, we can only maintain a `LastCumulativeAck` field in `PersistentAcknowledgmentsGroupingTracker` and call the related methods in `doCumulativeAck` and `flushAsync`. It also fixes the problem that two concurrent `flushAsync` calls might send the same ACK command twice. (cherry picked from commit 936d6fdc780ea454e72e82b6c7a1885799158d02) --- .../PersistentAcknowledgmentsGroupingTracker.java | 141 +++++++++++---------- .../pulsar/client/impl/LastCumulativeAckTest.java | 86 +++++++++++++ 2 files changed, 159 insertions(+), 68 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java index f0f0cfd7548..9829babece7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java @@ -21,7 +21,7 @@ package org.apache.pulsar.client.impl; import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables; import io.netty.buffer.ByteBuf; import io.netty.channel.EventLoopGroup; -import io.netty.util.Recycler; +import io.netty.util.concurrent.FastThreadLocal; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -34,9 +34,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.locks.ReentrantReadWriteLock; -import lombok.NonNull; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Triple; import org.apache.pulsar.client.api.MessageId; @@ -68,18 +67,11 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments private volatile TimedCompletableFuture<Void> currentIndividualAckFuture; private volatile TimedCompletableFuture<Void> currentCumulativeAckFuture; - private volatile LastCumulativeAck lastCumulativeAck = - LastCumulativeAck.create((MessageIdImpl) MessageIdImpl.earliest, null); - - private volatile boolean cumulativeAckFlushRequired = false; + private final LastCumulativeAck lastCumulativeAck = new LastCumulativeAck(); // When we flush the command, we should ensure current ack request will send correct private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - private static final AtomicReferenceFieldUpdater<PersistentAcknowledgmentsGroupingTracker, LastCumulativeAck> - LAST_CUMULATIVE_ACK_UPDATER = AtomicReferenceFieldUpdater.newUpdater( - PersistentAcknowledgmentsGroupingTracker.class, LastCumulativeAck.class, "lastCumulativeAck"); - /** * This is a set of all the individual acks that the application has issued and that were not already sent to * broker. @@ -116,13 +108,13 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments * resent after a disconnection and for which the user has already sent an acknowledgement. */ @Override - public boolean isDuplicate(@NonNull MessageId messageId) { - final MessageId messageIdOfLastAck = lastCumulativeAck.messageId; + public boolean isDuplicate(MessageId messageId) { + final MessageIdImpl messageIdOfLastAck = lastCumulativeAck.getMessageId(); if (messageIdOfLastAck != null && messageId.compareTo(messageIdOfLastAck) <= 0) { // Already included in a cumulative ack return true; } else { - return pendingIndividualAcks.contains(messageId); + return pendingIndividualAcks.contains((MessageIdImpl) messageId); } } @@ -370,30 +362,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments private void doCumulativeAckAsync(MessageIdImpl msgId, BitSetRecyclable bitSet) { // Handle concurrent updates from different threads - LastCumulativeAck currentCumulativeAck = LastCumulativeAck.create(msgId, bitSet); - while (true) { - LastCumulativeAck lastCumulativeAck = this.lastCumulativeAck; - if (msgId.compareTo(lastCumulativeAck.messageId) > 0) { - if (LAST_CUMULATIVE_ACK_UPDATER.compareAndSet(this, this.lastCumulativeAck, currentCumulativeAck)) { - if (lastCumulativeAck.bitSetRecyclable != null) { - try { - lastCumulativeAck.bitSetRecyclable.recycle(); - } catch (Exception ignore) { - // no-op - } - lastCumulativeAck.bitSetRecyclable = null; - } - lastCumulativeAck.recycle(); - // Successfully updated the last cumulative ack. Next flush iteration will send this to broker. - cumulativeAckFlushRequired = true; - return; - } - } else { - currentCumulativeAck.recycle(); - // message id acknowledging an before the current last cumulative ack - return; - } - } + lastCumulativeAck.update(msgId, bitSet); } private CompletableFuture<Void> doCumulativeBatchIndexAck(BatchMessageIdImpl batchMessageId, @@ -474,15 +443,15 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments } private void flushAsync(ClientCnx cnx) { + final LastCumulativeAck lastCumulativeAckToFlush = lastCumulativeAck.flush(); boolean shouldFlush = false; - if (cumulativeAckFlushRequired) { - newMessageAckCommandAndWrite(cnx, consumer.consumerId, lastCumulativeAck.messageId.ledgerId, - lastCumulativeAck.messageId.getEntryId(), lastCumulativeAck.bitSetRecyclable, - AckType.Cumulative, null, Collections.emptyMap(), false, - this.currentCumulativeAckFuture, null); - this.consumer.unAckedChunkedMessageIdSequenceMap.remove(lastCumulativeAck.messageId); + if (lastCumulativeAckToFlush != null) { shouldFlush = true; - cumulativeAckFlushRequired = false; + final MessageIdImpl messageId = lastCumulativeAckToFlush.getMessageId(); + newMessageAckCommandAndWrite(cnx, consumer.consumerId, messageId.getLedgerId(), messageId.getEntryId(), + lastCumulativeAckToFlush.getBitSetRecyclable(), AckType.Cumulative, null, + Collections.emptyMap(), false, this.currentCumulativeAckFuture, null); + this.consumer.unAckedChunkedMessageIdSequenceMap.remove(messageId); } // Flush all individual acks @@ -560,7 +529,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments @Override public void flushAndClean() { flush(); - lastCumulativeAck = LastCumulativeAck.create((MessageIdImpl) MessageIdImpl.earliest, null); + lastCumulativeAck.reset(); pendingIndividualAcks.clear(); } @@ -664,36 +633,72 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments return ackReceiptEnabled && cnx != null && Commands.peerSupportsAckReceipt(cnx.getRemoteEndpointProtocolVersion()); } +} - private static class LastCumulativeAck { - private MessageIdImpl messageId; - private BitSetRecyclable bitSetRecyclable; +@Getter +class LastCumulativeAck { - static LastCumulativeAck create(MessageIdImpl messageId, BitSetRecyclable bitSetRecyclable) { - LastCumulativeAck op = RECYCLER.get(); - op.messageId = messageId; - op.bitSetRecyclable = bitSetRecyclable; - return op; - } + // It's used as a returned value by `flush()` to avoid creating a new instance each time `flush()` is called + public static final FastThreadLocal<LastCumulativeAck> LOCAL_LAST_CUMULATIVE_ACK = + new FastThreadLocal<LastCumulativeAck>() { - private LastCumulativeAck(Recycler.Handle<LastCumulativeAck> recyclerHandle) { - this.recyclerHandle = recyclerHandle; - } + @Override + protected LastCumulativeAck initialValue() { + return new LastCumulativeAck(); + } + }; + public static final MessageIdImpl DEFAULT_MESSAGE_ID = (MessageIdImpl) MessageIdImpl.earliest; - void recycle() { - if (bitSetRecyclable != null) { + private volatile MessageIdImpl messageId = DEFAULT_MESSAGE_ID; + private BitSetRecyclable bitSetRecyclable = null; + private boolean flushRequired = false; + + public synchronized void update(final MessageIdImpl messageId, final BitSetRecyclable bitSetRecyclable) { + if (messageId.compareTo(this.messageId) > 0) { + if (this.bitSetRecyclable != null && this.bitSetRecyclable != bitSetRecyclable) { this.bitSetRecyclable.recycle(); } - this.messageId = null; - recyclerHandle.recycle(this); + set(messageId, bitSetRecyclable); + flushRequired = true; } + } - private final Recycler.Handle<LastCumulativeAck> recyclerHandle; - private static final Recycler<LastCumulativeAck> RECYCLER = new Recycler<LastCumulativeAck>() { - @Override - protected LastCumulativeAck newObject(Handle<LastCumulativeAck> handle) { - return new LastCumulativeAck(handle); + public synchronized LastCumulativeAck flush() { + if (flushRequired) { + final LastCumulativeAck localLastCumulativeAck = LOCAL_LAST_CUMULATIVE_ACK.get(); + if (bitSetRecyclable != null) { + localLastCumulativeAck.set(messageId, BitSetRecyclable.valueOf(bitSetRecyclable.toLongArray())); + } else { + localLastCumulativeAck.set(this.messageId, null); } - }; + flushRequired = false; + return localLastCumulativeAck; + } else { + // Return null to indicate nothing to be flushed + return null; + } + } + + public synchronized void reset() { + if (bitSetRecyclable != null) { + bitSetRecyclable.recycle(); + } + messageId = DEFAULT_MESSAGE_ID; + bitSetRecyclable = null; + flushRequired = false; + } + + private synchronized void set(final MessageIdImpl messageId, final BitSetRecyclable bitSetRecyclable) { + this.messageId = messageId; + this.bitSetRecyclable = bitSetRecyclable; + } + + @Override + public String toString() { + String s = messageId.toString(); + if (bitSetRecyclable != null) { + s += " (bit set: " + bitSetRecyclable + ")"; + } + return s; } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/LastCumulativeAckTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/LastCumulativeAckTest.java new file mode 100644 index 00000000000..102ccfc0e07 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/LastCumulativeAckTest.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotSame; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertSame; +import static org.testng.Assert.assertTrue; +import org.apache.pulsar.common.util.collections.BitSetRecyclable; +import org.testng.annotations.Test; + +public class LastCumulativeAckTest { + + @Test + public void testUpdate() { + final LastCumulativeAck lastCumulativeAck = new LastCumulativeAck(); + assertFalse(lastCumulativeAck.isFlushRequired()); + assertEquals(lastCumulativeAck.getMessageId(), LastCumulativeAck.DEFAULT_MESSAGE_ID); + assertNull(lastCumulativeAck.getBitSetRecyclable()); + + final MessageIdImpl messageId1 = new MessageIdImpl(0L, 1L, 10); + final BitSetRecyclable bitSetRecyclable1 = BitSetRecyclable.create(); + bitSetRecyclable1.set(0, 3); + lastCumulativeAck.update(messageId1, bitSetRecyclable1); + assertTrue(lastCumulativeAck.isFlushRequired()); + assertSame(lastCumulativeAck.getMessageId(), messageId1); + assertSame(lastCumulativeAck.getBitSetRecyclable(), bitSetRecyclable1); + + final MessageIdImpl messageId2 = new MessageIdImpl(0L, 2L, 8); + lastCumulativeAck.update(messageId2, bitSetRecyclable1); + // bitSetRecyclable1 is not recycled + assertEquals(bitSetRecyclable1.toString(), "{0, 1, 2}"); + + final BitSetRecyclable bitSetRecyclable2 = BitSetRecyclable.create(); + bitSetRecyclable2.set(0, 2); + + // `update()` only accepts a newer message ID, so this call here has no side effect + lastCumulativeAck.update(messageId2, bitSetRecyclable2); + assertSame(lastCumulativeAck.getBitSetRecyclable(), bitSetRecyclable1); + + final MessageIdImpl messageId3 = new MessageIdImpl(0L, 3L, 9); + lastCumulativeAck.update(messageId3, bitSetRecyclable2); + // bitSetRecyclable1 is recycled because it's replaced in `update` + assertEquals(bitSetRecyclable1.toString(), "{}"); + assertSame(lastCumulativeAck.getMessageId(), messageId3); + assertSame(lastCumulativeAck.getBitSetRecyclable(), bitSetRecyclable2); + bitSetRecyclable2.recycle(); + } + + @Test + public void testFlush() { + final LastCumulativeAck lastCumulativeAck = new LastCumulativeAck(); + assertNull(lastCumulativeAck.flush()); + + final MessageIdImpl messageId = new MessageIdImpl(0L, 1L, 3); + final BitSetRecyclable bitSetRecyclable = BitSetRecyclable.create(); + bitSetRecyclable.set(0, 3); + lastCumulativeAck.update(messageId, bitSetRecyclable); + assertTrue(lastCumulativeAck.isFlushRequired()); + + final LastCumulativeAck lastCumulativeAckToFlush = lastCumulativeAck.flush(); + assertFalse(lastCumulativeAck.isFlushRequired()); + assertSame(lastCumulativeAckToFlush.getMessageId(), messageId); + assertNotSame(lastCumulativeAckToFlush.getBitSetRecyclable(), bitSetRecyclable); + assertEquals(lastCumulativeAckToFlush.getBitSetRecyclable(), bitSetRecyclable); + } + +}
