congbobo184 commented on a change in pull request #13481:
URL: https://github.com/apache/pulsar/pull/13481#discussion_r791797179
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
##########
@@ -500,58 +466,43 @@ private void addCommitTxnRequest(TxnID txnId, Map<String,
Long> properties, long
return null;
});
}
- return commitFuture;
- }
-
- private void addAbortTxnRequest(TxnID txnId, Consumer consumer, long
lowWaterMark,
- CompletableFuture<Void> completableFuture)
{
- acceptQueue.add(() -> abortTxn(txnId, consumer, lowWaterMark,
true).thenAccept(v ->
- completableFuture.complete(null)).exceptionally(e -> {
- completableFuture.completeExceptionally(e);
- return null;
- }));
}
@Override
- public synchronized CompletableFuture<Void> abortTxn(TxnID txnId, Consumer
consumer,
- long lowWaterMark,
boolean isInCacheRequest) {
- if (!checkIfReady()) {
- synchronized (PendingAckHandleImpl.this) {
+ public CompletableFuture<Void> commitTxn(TxnID txnID, Map<String, Long>
properties, long lowWaterMark) {
+ CompletableFuture<Void> commitFuture = new CompletableFuture<>();
+ internalPinnedExecutor.execute(() -> {
+ if (!checkIfReady()) {
if (state == State.Initializing) {
- CompletableFuture<Void> completableFuture = new
CompletableFuture<>();
- addAbortTxnRequest(txnId, consumer, lowWaterMark,
completableFuture);
- return completableFuture;
+ addCommitTxnRequest(txnID, properties, lowWaterMark,
commitFuture);
Review comment:
use switch case
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
##########
@@ -130,73 +142,40 @@ public PendingAckHandleImpl(PersistentSubscription
persistentSubscription) {
private void initPendingAckStore() {
if (changeToInitializingState()) {
- synchronized (PendingAckHandleImpl.this) {
- if (!checkIfClose()) {
- this.pendingAckStoreFuture =
-
pendingAckStoreProvider.newPendingAckStore(persistentSubscription);
- this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
- pendingAckStore.replayAsync(this,
- ((PersistentTopic)
persistentSubscription.getTopic()).getBrokerService()
-
.getPulsar().getTransactionReplayExecutor());
- }).exceptionally(e -> {
- acceptQueue.clear();
- changeToErrorState();
- exceptionHandleFuture(e.getCause());
- log.error("PendingAckHandleImpl init fail! TopicName :
{}, SubName: {}", topicName, subName, e);
- return null;
- });
- }
+ if (!checkIfClose()) {
+ this.pendingAckStoreFuture =
+
pendingAckStoreProvider.newPendingAckStore(persistentSubscription);
+ this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
+ pendingAckStore.replayAsync(this,
+ (ScheduledExecutorService) internalPinnedExecutor);
+ }).exceptionally(e -> {
+ acceptQueue.clear();
+ changeToErrorState();
+ log.error("PendingAckHandleImpl init fail! TopicName : {},
SubName: {}", topicName, subName, e);
+ exceptionHandleFuture(e.getCause());
Review comment:
when exceptionHandleFuture() we should handle request witch in cache
and don't clear the queue. may can implement it on next pr.
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
##########
@@ -611,6 +562,34 @@ private void addAbortTxnRequest(TxnID txnId, Consumer
consumer, long lowWaterMar
return abortFuture;
}
+ @Override
+ public CompletableFuture<Void> abortTxn(TxnID txnId, Consumer consumer,
long lowWaterMark) {
+ CompletableFuture<Void> abortFuture = new CompletableFuture<>();
+ internalPinnedExecutor.execute(() -> {
+ if (!checkIfReady()) {
+ if (state == State.Initializing) {
Review comment:
use switch case is better
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
##########
@@ -309,11 +310,10 @@ public PulsarService(ServiceConfiguration config,
new DefaultThreadFactory("zk-cache-callback"));
if (config.isTransactionCoordinatorEnabled()) {
- this.transactionReplayExecutor = Executors.newScheduledThreadPool(
- config.getNumTransactionReplayThreadPoolSize(),
- new DefaultThreadFactory("transaction-replay"));
+ this.transactionExecutorProvider = new
ExecutorProvider(this.getConfiguration().getNumIOThreads(),
Review comment:
use config.getNumTransactionReplayThreadPoolSize()
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]