congbobo184 commented on a change in pull request #13481:
URL: https://github.com/apache/pulsar/pull/13481#discussion_r776121803
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java
##########
@@ -44,7 +44,7 @@ public MLPendingAckReplyCallBack(PendingAckHandleImpl
pendingAckHandle) {
@Override
public void replayComplete() {
- synchronized (pendingAckHandle) {
+ pendingAckHandle.getInternalPinnedExecutor().submit(() -> {
Review comment:
use execute method. if you use submit, it will return future, we don't
need the future
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
##########
@@ -291,46 +275,51 @@ private void addIndividualAcknowledgeMessageRequest(TxnID
txnID,
return completableFuture;
}
- private void addCumulativeAcknowledgeMessageRequest(TxnID txnID,
- List<PositionImpl>
positions,
-
CompletableFuture<Void> completableFuture) {
- acceptQueue.add(() -> cumulativeAcknowledgeMessage(txnID, positions,
true).thenAccept(v ->
- completableFuture.complete(null)).exceptionally(e -> {
- completableFuture.completeExceptionally(e);
- return null;
- }));
- }
-
@Override
- public CompletableFuture<Void> cumulativeAcknowledgeMessage(TxnID txnID,
-
List<PositionImpl> positions,
+ public CompletableFuture<Void> individualAcknowledgeMessage(TxnID txnID,
+
List<MutablePair<PositionImpl, Integer>> positions,
boolean
isInCacheRequest) {
- if (!checkIfReady()) {
- CompletableFuture<Void> completableFuture = new
CompletableFuture<>();
- synchronized (PendingAckHandleImpl.this) {
+ CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+ internalPinnedExecutor.submit(() -> {
+ if (!checkIfReady()) {
switch (state) {
case Initializing:
- addCumulativeAcknowledgeMessageRequest(txnID,
positions, completableFuture);
- return completableFuture;
+ addIndividualAcknowledgeMessageRequest(txnID,
positions, completableFuture);
+ return;
case None:
- addCumulativeAcknowledgeMessageRequest(txnID,
positions, completableFuture);
+ addIndividualAcknowledgeMessageRequest(txnID,
positions, completableFuture);
initPendingAckStore();
- return completableFuture;
+ return;
case Error:
completableFuture.completeExceptionally(
new
ServiceUnitNotReadyException("PendingAckHandle not replay error!"));
- return completableFuture;
+ return;
case Close:
completableFuture.completeExceptionally(
new
ServiceUnitNotReadyException("PendingAckHandle have been closed!"));
- return completableFuture;
+ return;
default:
break;
-
}
}
- }
+ internalIndividualAcknowledgeMessage(txnID, positions,
completableFuture);
+ });
+ return completableFuture;
+ }
+ private void addCumulativeAcknowledgeMessageRequest(TxnID txnID,
+ List<PositionImpl>
positions,
+
CompletableFuture<Void> completableFuture) {
+ acceptQueue.add(() -> internalCumulativeAcknowledgeMessage(txnID,
positions, new CompletableFuture<>())
Review comment:
same above `addIndividualAcknowledgeMessageRequest `
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
##########
@@ -610,6 +586,37 @@ private void addAbortTxnRequest(TxnID txnId, Consumer
consumer, long lowWaterMar
return abortFuture;
}
+ @Override
+ public CompletableFuture<Void> abortTxn(TxnID txnId, Consumer consumer,
+ long lowWaterMark,
boolean isInCacheRequest) {
+ CompletableFuture<Void> abortFuture = new CompletableFuture<>();
+ internalPinnedExecutor.submit(() -> {
Review comment:
use excute
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -305,6 +307,8 @@ public BrokerService(PulsarService pulsar, EventLoopGroup
eventLoopGroup) throws
.newSingleThreadScheduledExecutor(new
DefaultThreadFactory("pulsar-stats-updater"));
this.authorizationService = new AuthorizationService(
pulsar.getConfiguration(), pulsar().getPulsarResources());
+ this.internalExecutorService = new
ExecutorProvider(pulsar.getConfiguration().getNumIOThreads(),
Review comment:
now only transaction pending ack will use it, so when enable transaction
we can init it
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
##########
@@ -126,76 +132,54 @@ public PendingAckHandleImpl(PersistentSubscription
persistentSubscription) {
completeHandleFuture();
}
});
+
+ internalPinnedExecutor = persistentSubscription
+ .getTopic()
+ .getBrokerService()
+ .getInternalExecutorService()
+ .getExecutor();
}
private void initPendingAckStore() {
if (changeToInitializingState()) {
- synchronized (PendingAckHandleImpl.this) {
- if (!checkIfClose()) {
- this.pendingAckStoreFuture =
-
pendingAckStoreProvider.newPendingAckStore(persistentSubscription);
- this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
- pendingAckStore.replayAsync(this,
- ((PersistentTopic)
persistentSubscription.getTopic()).getBrokerService()
-
.getPulsar().getTransactionReplayExecutor());
- }).exceptionally(e -> {
- acceptQueue.clear();
- changeToErrorState();
- log.error("PendingAckHandleImpl init fail! TopicName :
{}, SubName: {}", topicName, subName, e);
- return null;
- });
- }
+ if (!checkIfClose()) {
+ this.pendingAckStoreFuture =
+
pendingAckStoreProvider.newPendingAckStore(persistentSubscription);
+ this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
+ pendingAckStore.replayAsync(this,
+
persistentSubscription.getTopic().getBrokerService()
+
.getPulsar().getTransactionReplayExecutor());
+ }).exceptionally(e -> {
+ acceptQueue.clear();
+ changeToErrorState();
+ log.error("PendingAckHandleImpl init fail! TopicName : {},
SubName: {}", topicName, subName, e);
+ return null;
+ });
}
}
}
private void addIndividualAcknowledgeMessageRequest(TxnID txnID,
List<MutablePair<PositionImpl, Integer>> positions,
CompletableFuture<Void> completableFuture) {
- acceptQueue.add(() -> individualAcknowledgeMessage(txnID, positions,
true).thenAccept(v ->
- completableFuture.complete(null)).exceptionally(e -> {
+ acceptQueue.add(() -> internalIndividualAcknowledgeMessage(txnID,
positions, new CompletableFuture<>())
Review comment:
why we need to create a CompletableFuture? If
`internalIndividualAcknowledgeMessage` can handle completableFuture, we don't
need to create a new future
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
##########
@@ -291,46 +275,51 @@ private void addIndividualAcknowledgeMessageRequest(TxnID
txnID,
return completableFuture;
}
- private void addCumulativeAcknowledgeMessageRequest(TxnID txnID,
- List<PositionImpl>
positions,
-
CompletableFuture<Void> completableFuture) {
- acceptQueue.add(() -> cumulativeAcknowledgeMessage(txnID, positions,
true).thenAccept(v ->
- completableFuture.complete(null)).exceptionally(e -> {
- completableFuture.completeExceptionally(e);
- return null;
- }));
- }
-
@Override
- public CompletableFuture<Void> cumulativeAcknowledgeMessage(TxnID txnID,
-
List<PositionImpl> positions,
+ public CompletableFuture<Void> individualAcknowledgeMessage(TxnID txnID,
+
List<MutablePair<PositionImpl, Integer>> positions,
boolean
isInCacheRequest) {
- if (!checkIfReady()) {
- CompletableFuture<Void> completableFuture = new
CompletableFuture<>();
- synchronized (PendingAckHandleImpl.this) {
+ CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+ internalPinnedExecutor.submit(() -> {
Review comment:
use excute
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
##########
@@ -394,56 +381,50 @@ private void addCumulativeAcknowledgeMessageRequest(TxnID
txnID,
return completableFuture;
}
- private void addCommitTxnRequest(TxnID txnId, Map<String, Long>
properties, long lowWaterMark,
- CompletableFuture<Void> completableFuture)
{
- acceptQueue.add(() -> commitTxn(txnId, properties, lowWaterMark,
true).thenAccept(v ->
- completableFuture.complete(null)).exceptionally(e -> {
- completableFuture.completeExceptionally(e);
- return null;
- }));
- }
-
@Override
- public synchronized CompletableFuture<Void> commitTxn(TxnID txnID,
Map<String, Long> properties,
- long lowWaterMark,
boolean isInCacheRequest) {
- if (!checkIfReady()) {
- synchronized (PendingAckHandleImpl.this) {
- if (state == State.Initializing) {
- CompletableFuture<Void> completableFuture = new
CompletableFuture<>();
- addCommitTxnRequest(txnID, properties, lowWaterMark,
completableFuture);
- return completableFuture;
- } else if (state == State.None) {
- CompletableFuture<Void> completableFuture = new
CompletableFuture<>();
- addCommitTxnRequest(txnID, properties, lowWaterMark,
completableFuture);
- initPendingAckStore();
- return completableFuture;
- } else if (checkIfReady()) {
-
- } else {
- if (state == State.Error) {
- return FutureUtil.failedFuture(
+ public CompletableFuture<Void> cumulativeAcknowledgeMessage(TxnID txnID,
+
List<PositionImpl> positions,
+ boolean
isInCacheRequest) {
+ CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+ internalPinnedExecutor.submit(() -> {
Review comment:
use excute
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
##########
@@ -502,55 +483,50 @@ private void addCommitTxnRequest(TxnID txnId, Map<String,
Long> properties, long
return commitFuture;
}
- private void addAbortTxnRequest(TxnID txnId, Consumer consumer, long
lowWaterMark,
- CompletableFuture<Void> completableFuture)
{
- acceptQueue.add(() -> abortTxn(txnId, consumer, lowWaterMark,
true).thenAccept(v ->
- completableFuture.complete(null)).exceptionally(e -> {
- completableFuture.completeExceptionally(e);
- return null;
- }));
- }
-
@Override
- public synchronized CompletableFuture<Void> abortTxn(TxnID txnId, Consumer
consumer,
- long lowWaterMark,
boolean isInCacheRequest) {
- if (!checkIfReady()) {
- synchronized (PendingAckHandleImpl.this) {
+ public CompletableFuture<Void> commitTxn(TxnID txnID, Map<String, Long>
properties,
+ long lowWaterMark,
boolean isInCacheRequest) {
+ CompletableFuture<Void> commitFuture = new CompletableFuture<>();
+ internalPinnedExecutor.submit(() -> {
+ if (!checkIfReady()) {
if (state == State.Initializing) {
CompletableFuture<Void> completableFuture = new
CompletableFuture<>();
- addAbortTxnRequest(txnId, consumer, lowWaterMark,
completableFuture);
- return completableFuture;
+ addCommitTxnRequest(txnID, properties, lowWaterMark,
completableFuture);
+ return;
} else if (state == State.None) {
CompletableFuture<Void> completableFuture = new
CompletableFuture<>();
- addAbortTxnRequest(txnId, consumer, lowWaterMark,
completableFuture);
+ addCommitTxnRequest(txnID, properties, lowWaterMark,
completableFuture);
initPendingAckStore();
- return completableFuture;
+ return;
} else if (checkIfReady()) {
} else {
if (state == State.Error) {
- return FutureUtil.failedFuture(
+ commitFuture.completeExceptionally(
new
ServiceUnitNotReadyException("PendingAckHandle not replay error!"));
} else {
- return FutureUtil.failedFuture(
+ commitFuture.completeExceptionally(
new
ServiceUnitNotReadyException("PendingAckHandle have been closed!"));
}
+ return;
}
}
- }
-
+ internalCommitTxn(txnID, properties, lowWaterMark, commitFuture);
+ });
+ return commitFuture;
+ }
- if (!acceptQueue.isEmpty() && !isInCacheRequest) {
- synchronized (PendingAckHandleImpl.this) {
- if (!acceptQueue.isEmpty()) {
- CompletableFuture<Void> completableFuture = new
CompletableFuture<>();
- addAbortTxnRequest(txnId, consumer, lowWaterMark,
completableFuture);
- return completableFuture;
- }
- }
- }
+ private void addAbortTxnRequest(TxnID txnId, Consumer consumer, long
lowWaterMark,
+ CompletableFuture<Void> completableFuture)
{
+ acceptQueue.add(() -> internalAbortTxn(txnId, consumer, lowWaterMark,
new CompletableFuture<>())
Review comment:
same above `addIndividualAcknowledgeMessageRequest `
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
##########
@@ -394,56 +381,50 @@ private void addCumulativeAcknowledgeMessageRequest(TxnID
txnID,
return completableFuture;
}
- private void addCommitTxnRequest(TxnID txnId, Map<String, Long>
properties, long lowWaterMark,
- CompletableFuture<Void> completableFuture)
{
- acceptQueue.add(() -> commitTxn(txnId, properties, lowWaterMark,
true).thenAccept(v ->
- completableFuture.complete(null)).exceptionally(e -> {
- completableFuture.completeExceptionally(e);
- return null;
- }));
- }
-
@Override
- public synchronized CompletableFuture<Void> commitTxn(TxnID txnID,
Map<String, Long> properties,
- long lowWaterMark,
boolean isInCacheRequest) {
- if (!checkIfReady()) {
- synchronized (PendingAckHandleImpl.this) {
- if (state == State.Initializing) {
- CompletableFuture<Void> completableFuture = new
CompletableFuture<>();
- addCommitTxnRequest(txnID, properties, lowWaterMark,
completableFuture);
- return completableFuture;
- } else if (state == State.None) {
- CompletableFuture<Void> completableFuture = new
CompletableFuture<>();
- addCommitTxnRequest(txnID, properties, lowWaterMark,
completableFuture);
- initPendingAckStore();
- return completableFuture;
- } else if (checkIfReady()) {
-
- } else {
- if (state == State.Error) {
- return FutureUtil.failedFuture(
+ public CompletableFuture<Void> cumulativeAcknowledgeMessage(TxnID txnID,
+
List<PositionImpl> positions,
+ boolean
isInCacheRequest) {
+ CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+ internalPinnedExecutor.submit(() -> {
+ if (!checkIfReady()) {
+ switch (state) {
+ case Initializing:
+ addCumulativeAcknowledgeMessageRequest(txnID,
positions, completableFuture);
+ return;
+ case None:
+ addCumulativeAcknowledgeMessageRequest(txnID,
positions, completableFuture);
+ initPendingAckStore();
+ return;
+ case Error:
+ completableFuture.completeExceptionally(
new
ServiceUnitNotReadyException("PendingAckHandle not replay error!"));
- } else {
- return FutureUtil.failedFuture(
+ return;
+ case Close:
+ completableFuture.completeExceptionally(
new
ServiceUnitNotReadyException("PendingAckHandle have been closed!"));
- }
-
+ return;
+ default:
+ break;
}
}
- }
+ internalCumulativeAcknowledgeMessage(txnID, positions,
completableFuture);
+ });
- if (!acceptQueue.isEmpty() && !isInCacheRequest) {
- synchronized (PendingAckHandleImpl.this) {
- if (!acceptQueue.isEmpty()) {
- CompletableFuture<Void> completableFuture = new
CompletableFuture<>();
- addCommitTxnRequest(txnID, properties, lowWaterMark,
completableFuture);
- return completableFuture;
- }
- }
- }
+ return completableFuture;
+ }
- CompletableFuture<Void> commitFuture = new CompletableFuture<>();
+ private void addCommitTxnRequest(TxnID txnId, Map<String, Long>
properties, long lowWaterMark,
Review comment:
same above `addIndividualAcknowledgeMessageRequest `
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]