congbobo184 commented on a change in pull request #13481:
URL: https://github.com/apache/pulsar/pull/13481#discussion_r786562199
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
##########
@@ -130,73 +142,39 @@ public PendingAckHandleImpl(PersistentSubscription
persistentSubscription) {
private void initPendingAckStore() {
if (changeToInitializingState()) {
- synchronized (PendingAckHandleImpl.this) {
- if (!checkIfClose()) {
- this.pendingAckStoreFuture =
-
pendingAckStoreProvider.newPendingAckStore(persistentSubscription);
- this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
- pendingAckStore.replayAsync(this,
- ((PersistentTopic)
persistentSubscription.getTopic()).getBrokerService()
-
.getPulsar().getTransactionReplayExecutor());
- }).exceptionally(e -> {
- acceptQueue.clear();
- changeToErrorState();
- exceptionHandleFuture(e.getCause());
- log.error("PendingAckHandleImpl init fail! TopicName :
{}, SubName: {}", topicName, subName, e);
- return null;
- });
- }
+ if (!checkIfClose()) {
+ this.pendingAckStoreFuture =
+
pendingAckStoreProvider.newPendingAckStore(persistentSubscription);
+ this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
+ pendingAckStore.replayAsync(this,
+ (ScheduledExecutorService) internalPinnedExecutor);
+ }).exceptionally(e -> {
+ acceptQueue.clear();
+ changeToErrorState();
+ log.error("PendingAckHandleImpl init fail! TopicName : {},
SubName: {}", topicName, subName, e);
+ return null;
+ });
}
}
}
private void addIndividualAcknowledgeMessageRequest(TxnID txnID,
List<MutablePair<PositionImpl, Integer>> positions,
CompletableFuture<Void> completableFuture) {
- acceptQueue.add(() -> individualAcknowledgeMessage(txnID, positions,
true).thenAccept(v ->
- completableFuture.complete(null)).exceptionally(e -> {
- completableFuture.completeExceptionally(e);
- return null;
- }));
+ acceptQueue.add(() -> internalIndividualAcknowledgeMessage(txnID,
positions, completableFuture));
}
- @Override
- public CompletableFuture<Void> individualAcknowledgeMessage(TxnID txnID,
-
List<MutablePair<PositionImpl, Integer>> positions,
- boolean
isInCacheRequest) {
-
- if (!checkIfReady()) {
- CompletableFuture<Void> completableFuture = new
CompletableFuture<>();
- synchronized (PendingAckHandleImpl.this) {
- switch (state) {
- case Initializing:
- addIndividualAcknowledgeMessageRequest(txnID,
positions, completableFuture);
- return completableFuture;
- case None:
- addIndividualAcknowledgeMessageRequest(txnID,
positions, completableFuture);
- initPendingAckStore();
- return completableFuture;
- case Error:
- completableFuture.completeExceptionally(
- new
ServiceUnitNotReadyException("PendingAckHandle not replay error!"));
- return completableFuture;
- case Close:
- completableFuture.completeExceptionally(
- new
ServiceUnitNotReadyException("PendingAckHandle have been closed!"));
- return completableFuture;
- default:
- break;
- }
- }
- }
-
+ public void internalIndividualAcknowledgeMessage(TxnID txnID,
List<MutablePair<PositionImpl, Integer>> positions,
+ CompletableFuture<Void>
completableFuture) {
if (txnID == null) {
- return FutureUtil.failedFuture(new
NotAllowedException("TransactionID can not be null."));
+ FutureUtil.failedFuture(new NotAllowedException("TransactionID can
not be null."));
Review comment:
completableFuture.completeExceptionally(new
NotAllowedException("TransactionID can not be null."));
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
##########
@@ -289,67 +267,68 @@ private void addIndividualAcknowledgeMessageRequest(TxnID
txnID,
completableFuture.completeExceptionally(e);
return null;
});
- return completableFuture;
- }
-
- private void addCumulativeAcknowledgeMessageRequest(TxnID txnID,
- List<PositionImpl>
positions,
-
CompletableFuture<Void> completableFuture) {
- acceptQueue.add(() -> cumulativeAcknowledgeMessage(txnID, positions,
true).thenAccept(v ->
- completableFuture.complete(null)).exceptionally(e -> {
- completableFuture.completeExceptionally(e);
- return null;
- }));
}
@Override
- public CompletableFuture<Void> cumulativeAcknowledgeMessage(TxnID txnID,
-
List<PositionImpl> positions,
+ public CompletableFuture<Void> individualAcknowledgeMessage(TxnID txnID,
+
List<MutablePair<PositionImpl, Integer>> positions,
boolean
isInCacheRequest) {
Review comment:
delete isInCacheRequest
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
##########
@@ -130,73 +142,39 @@ public PendingAckHandleImpl(PersistentSubscription
persistentSubscription) {
private void initPendingAckStore() {
if (changeToInitializingState()) {
- synchronized (PendingAckHandleImpl.this) {
- if (!checkIfClose()) {
- this.pendingAckStoreFuture =
-
pendingAckStoreProvider.newPendingAckStore(persistentSubscription);
- this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
- pendingAckStore.replayAsync(this,
- ((PersistentTopic)
persistentSubscription.getTopic()).getBrokerService()
-
.getPulsar().getTransactionReplayExecutor());
- }).exceptionally(e -> {
- acceptQueue.clear();
- changeToErrorState();
- exceptionHandleFuture(e.getCause());
Review comment:
dont delete this
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
##########
@@ -130,73 +142,39 @@ public PendingAckHandleImpl(PersistentSubscription
persistentSubscription) {
private void initPendingAckStore() {
if (changeToInitializingState()) {
- synchronized (PendingAckHandleImpl.this) {
- if (!checkIfClose()) {
- this.pendingAckStoreFuture =
-
pendingAckStoreProvider.newPendingAckStore(persistentSubscription);
- this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
- pendingAckStore.replayAsync(this,
- ((PersistentTopic)
persistentSubscription.getTopic()).getBrokerService()
-
.getPulsar().getTransactionReplayExecutor());
- }).exceptionally(e -> {
- acceptQueue.clear();
- changeToErrorState();
- exceptionHandleFuture(e.getCause());
- log.error("PendingAckHandleImpl init fail! TopicName :
{}, SubName: {}", topicName, subName, e);
- return null;
- });
- }
+ if (!checkIfClose()) {
+ this.pendingAckStoreFuture =
+
pendingAckStoreProvider.newPendingAckStore(persistentSubscription);
+ this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
+ pendingAckStore.replayAsync(this,
+ (ScheduledExecutorService) internalPinnedExecutor);
+ }).exceptionally(e -> {
+ acceptQueue.clear();
+ changeToErrorState();
+ log.error("PendingAckHandleImpl init fail! TopicName : {},
SubName: {}", topicName, subName, e);
+ return null;
+ });
}
}
}
private void addIndividualAcknowledgeMessageRequest(TxnID txnID,
List<MutablePair<PositionImpl, Integer>> positions,
CompletableFuture<Void> completableFuture) {
- acceptQueue.add(() -> individualAcknowledgeMessage(txnID, positions,
true).thenAccept(v ->
- completableFuture.complete(null)).exceptionally(e -> {
- completableFuture.completeExceptionally(e);
- return null;
- }));
+ acceptQueue.add(() -> internalIndividualAcknowledgeMessage(txnID,
positions, completableFuture));
}
- @Override
- public CompletableFuture<Void> individualAcknowledgeMessage(TxnID txnID,
-
List<MutablePair<PositionImpl, Integer>> positions,
- boolean
isInCacheRequest) {
-
- if (!checkIfReady()) {
- CompletableFuture<Void> completableFuture = new
CompletableFuture<>();
- synchronized (PendingAckHandleImpl.this) {
- switch (state) {
- case Initializing:
- addIndividualAcknowledgeMessageRequest(txnID,
positions, completableFuture);
- return completableFuture;
- case None:
- addIndividualAcknowledgeMessageRequest(txnID,
positions, completableFuture);
- initPendingAckStore();
- return completableFuture;
- case Error:
- completableFuture.completeExceptionally(
- new
ServiceUnitNotReadyException("PendingAckHandle not replay error!"));
- return completableFuture;
- case Close:
- completableFuture.completeExceptionally(
- new
ServiceUnitNotReadyException("PendingAckHandle have been closed!"));
- return completableFuture;
- default:
- break;
- }
- }
- }
-
+ public void internalIndividualAcknowledgeMessage(TxnID txnID,
List<MutablePair<PositionImpl, Integer>> positions,
+ CompletableFuture<Void>
completableFuture) {
if (txnID == null) {
- return FutureUtil.failedFuture(new
NotAllowedException("TransactionID can not be null."));
+ FutureUtil.failedFuture(new NotAllowedException("TransactionID can
not be null."));
+ return;
+
}
if (positions == null) {
- return FutureUtil.failedFuture(new NotAllowedException("Positions
can not be null."));
+ FutureUtil.failedFuture(new NotAllowedException("Positions can not
be null."));
Review comment:
completableFuture.completeExceptionally(new
NotAllowedException("Positions can not be null."));
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]