eolivelli commented on a change in pull request #8750:
URL: https://github.com/apache/pulsar/pull/8750#discussion_r533946688
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java
##########
@@ -129,6 +129,20 @@ public boolean equals(Object obj) {
return false;
}
+ public static PositionImpl convertStringToPosition(String positionString) {
+ if (positionString == null) {
+ throw new NullPointerException();
+ } else {
+ String[] strings = positionString.split(":");
+ if (strings.length != 2) {
+ throw new IndexOutOfBoundsException();
+ }
+ long ledgerId = Long.parseLong(strings[0]);
+ long entryId = Long.parseLong(strings[1]);
Review comment:
can you please catch "NumberFormatException" and then rethrow a
meaningful error message ?
##########
File path:
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java
##########
@@ -129,6 +129,20 @@ public boolean equals(Object obj) {
return false;
}
+ public static PositionImpl convertStringToPosition(String positionString) {
+ if (positionString == null) {
+ throw new NullPointerException();
+ } else {
+ String[] strings = positionString.split(":");
+ if (strings.length != 2) {
+ throw new IndexOutOfBoundsException();
Review comment:
can you add a message ? like "invalid position "+positionString
it will ease debugging issues in the future
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -18,32 +18,126 @@
*/
package org.apache.pulsar.broker.transaction.buffer.impl;
+import static org.apache.pulsar.common.protocol.Markers.isTxnMarker;
import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
+import
org.apache.pulsar.broker.transaction.buffer.TransactionBufferReplayCallback;
import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.api.proto.PulsarMarkers;
+import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
+import org.jctools.queues.MessagePassingQueue;
+import org.jctools.queues.SpscArrayQueue;
/**
* Transaction buffer based on normal persistent topic.
*/
@Slf4j
-public class TopicTransactionBuffer implements TransactionBuffer {
+public class TopicTransactionBuffer extends TopicTransactionBufferState
implements TransactionBuffer {
private final PersistentTopic topic;
- public TopicTransactionBuffer(PersistentTopic topic) {
+ private final SpscArrayQueue<Entry> entryQueue;
+
+ private final ConcurrentOpenHashMap<TxnID,
ConcurrentOpenHashSet<PositionImpl>> txnBufferCache = new
ConcurrentOpenHashMap<>();
+
+ //this is for transaction buffer replay start position
+ //this will be stored in managed ledger properties and every 10000 will
sync to zk by default.
+ private final ConcurrentSkipListSet<PositionImpl> positionsSort = new
ConcurrentSkipListSet<>();
+
+ private final ManagedCursor cursor;
+
+ //this is for replay
+ private final PositionImpl lastConfirmedEntry;
+ //this if for replay
+ private PositionImpl currentLoadPosition;
+
+ private final AtomicInteger countToSyncPosition = new AtomicInteger(0);
+
+ private final static String TXN_ON_GOING_POSITION_SUFFIX =
"-txnOnGoingPosition";
+
+ private final String txnOnGoingPositionName;
+
+ //TODO this can config
+ private int defaultCountToSyncPosition = 10000;
+
+ public TopicTransactionBuffer(PersistentTopic topic) throws
ManagedLedgerException {
+ super(State.None);
+ this.entryQueue = new SpscArrayQueue<>(2000);
this.topic = topic;
+ ManagedLedger managedLedger = topic.getManagedLedger();
+ this.lastConfirmedEntry = (PositionImpl)
managedLedger.getLastConfirmedEntry();
+ this.txnOnGoingPositionName = topic.getName() +
TXN_ON_GOING_POSITION_SUFFIX;
+ String positionString =
managedLedger.getProperties().get(txnOnGoingPositionName);
+ if (positionString == null) {
+ this.currentLoadPosition = PositionImpl.earliest;
+ } else {
+ PositionImpl position = PositionImpl.earliest;
+ try {
+ position =
PositionImpl.convertStringToPosition(positionString);
+ } catch (Exception e) {
+ log.error("Topic : [{}] transaction buffer get replay start
position error!", topic.getName());
+ }
+ this.currentLoadPosition = position;
+ }
+ this.cursor = managedLedger.newNonDurableCursor(currentLoadPosition);
+
+ new Thread(() -> new TopicTransactionBufferReplayer(new
TransactionBufferReplayCallback() {
Review comment:
can we give a name to this thread ?
probably it should be marked as "deamon"
we should also ensure that the thread ends when we are shutting down this
TopicTransactionBuffer
otherwise we will have a zoombie thread that is still retaining references
to this object and possibly corrupting its status.
one question isn't it too heavyweight to start a thread per each topic ?
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
##########
@@ -243,10 +241,13 @@ public void
removeTransactionMetadataStore(TransactionCoordinatorID tcId) {
});
List<MessageId> messageIdList = new ArrayList<>();
- for (MessageIdData messageIdData : messageIdDataList) {
- messageIdList.add(new MessageIdImpl(
- messageIdData.getLedgerId(),
messageIdData.getEntryId(), messageIdData.getPartition()));
- messageIdData.recycle();
+ //TODO when pending ack buffer finish this logic can remove
Review comment:
can we link an issue for this TODO ?
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
##########
@@ -266,20 +267,36 @@ public void
removeTransactionMetadataStore(TransactionCoordinatorID tcId) {
}
completableFutureList.add(actionFuture);
});
+
FutureUtil.waitForAll(completableFutureList).whenComplete((ignored,
waitThrowable) -> {
+ if (waitThrowable != null) {
+ resultFuture.completeExceptionally(waitThrowable);
+ endTxnInTransactionBuffer(txnID, txnAction,
messageIdDataList);
+ return;
+ }
+ resultFuture.complete(null);
+ TxnStatus newStatus;
+ TxnStatus expectedStatus;
+ if (txnAction == TxnAction.COMMIT_VALUE) {
+ newStatus = TxnStatus.COMMITTED;
+ expectedStatus = TxnStatus.COMMITTING;
+ } else {
+ newStatus = TxnStatus.ABORTED;
+ expectedStatus = TxnStatus.ABORTING;
+ }
+ //TODO find a better way to handle this failure when update
transaction sstatus
+ finalityEndTransaction(txnID, newStatus, expectedStatus);
+ });
+ });
+ return resultFuture;
+ }
- try {
-
FutureUtil.waitForAll(completableFutureList).whenComplete((ignored,
waitThrowable) -> {
- if (waitThrowable != null) {
- resultFuture.completeExceptionally(waitThrowable);
- return;
- }
- resultFuture.complete(null);
- });
- } catch (Exception e) {
- resultFuture.completeExceptionally(e);
+ private void finalityEndTransaction(TxnID txnID, TxnStatus newStatus,
TxnStatus expectedStatus) {
Review comment:
what about renaming "finalityEndTransaction" to
"completeEndTransaction" ?
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
##########
@@ -18,32 +18,126 @@
*/
package org.apache.pulsar.broker.transaction.buffer.impl;
+import static org.apache.pulsar.common.protocol.Markers.isTxnMarker;
import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
+import
org.apache.pulsar.broker.transaction.buffer.TransactionBufferReplayCallback;
import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.api.proto.PulsarMarkers;
+import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
+import org.jctools.queues.MessagePassingQueue;
+import org.jctools.queues.SpscArrayQueue;
/**
* Transaction buffer based on normal persistent topic.
*/
@Slf4j
-public class TopicTransactionBuffer implements TransactionBuffer {
+public class TopicTransactionBuffer extends TopicTransactionBufferState
implements TransactionBuffer {
private final PersistentTopic topic;
- public TopicTransactionBuffer(PersistentTopic topic) {
+ private final SpscArrayQueue<Entry> entryQueue;
+
+ private final ConcurrentOpenHashMap<TxnID,
ConcurrentOpenHashSet<PositionImpl>> txnBufferCache = new
ConcurrentOpenHashMap<>();
+
+ //this is for transaction buffer replay start position
+ //this will be stored in managed ledger properties and every 10000 will
sync to zk by default.
+ private final ConcurrentSkipListSet<PositionImpl> positionsSort = new
ConcurrentSkipListSet<>();
+
+ private final ManagedCursor cursor;
+
+ //this is for replay
+ private final PositionImpl lastConfirmedEntry;
+ //this if for replay
+ private PositionImpl currentLoadPosition;
+
+ private final AtomicInteger countToSyncPosition = new AtomicInteger(0);
+
+ private final static String TXN_ON_GOING_POSITION_SUFFIX =
"-txnOnGoingPosition";
+
+ private final String txnOnGoingPositionName;
+
+ //TODO this can config
+ private int defaultCountToSyncPosition = 10000;
+
+ public TopicTransactionBuffer(PersistentTopic topic) throws
ManagedLedgerException {
+ super(State.None);
+ this.entryQueue = new SpscArrayQueue<>(2000);
this.topic = topic;
+ ManagedLedger managedLedger = topic.getManagedLedger();
+ this.lastConfirmedEntry = (PositionImpl)
managedLedger.getLastConfirmedEntry();
+ this.txnOnGoingPositionName = topic.getName() +
TXN_ON_GOING_POSITION_SUFFIX;
+ String positionString =
managedLedger.getProperties().get(txnOnGoingPositionName);
+ if (positionString == null) {
+ this.currentLoadPosition = PositionImpl.earliest;
+ } else {
+ PositionImpl position = PositionImpl.earliest;
+ try {
+ position =
PositionImpl.convertStringToPosition(positionString);
+ } catch (Exception e) {
+ log.error("Topic : [{}] transaction buffer get replay start
position error!", topic.getName());
+ }
+ this.currentLoadPosition = position;
+ }
+ this.cursor = managedLedger.newNonDurableCursor(currentLoadPosition);
+
+ new Thread(() -> new TopicTransactionBufferReplayer(new
TransactionBufferReplayCallback() {
+
+ @Override
+ public void replayComplete() {
+ if (!changeToReadyState()) {
+ log.error("Managed ledger transaction metadata store
change state error when replay complete");
+ }
+ }
+
+ @Override
+ public void handleMetadataEntry(Position position, MessageMetadata
messageMetadata) {
+ if (!messageMetadata.hasTxnidMostBits() ||
!messageMetadata.hasTxnidLeastBits()) {
+ return;
+ }
+ TxnID txnID = new TxnID(messageMetadata.getTxnidMostBits(),
+ messageMetadata.getTxnidLeastBits());
+ if (isTxnMarker(messageMetadata)) {
+ ConcurrentOpenHashSet<PositionImpl> positions =
txnBufferCache.remove(txnID);
+ positionsSort.removeAll(positions.values());
+ } else {
+ ConcurrentOpenHashSet<PositionImpl> positions =
+ txnBufferCache.computeIfAbsent(txnID, (v) -> new
ConcurrentOpenHashSet<>());
+ positions.add((PositionImpl) position);
+ positionsSort.add((PositionImpl) position);
+ }
+ }
+ }).start()).start();
Review comment:
we should not start a Thread in a constructor, especially when the
Thread refers to the object that is beeing created, we could have bad memory
leaks in case we are not properly releasing all of the resources
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]