codelipenghui commented on code in PR #21274:
URL: https://github.com/apache/pulsar/pull/21274#discussion_r1374224203
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java:
##########
@@ -945,12 +955,35 @@ public void completeHandleFuture() {
}
public void exceptionHandleFuture(Throwable t) {
+ if(isRetryableException(t)) {
+ long retryTime = backoff.next();
+ log.warn("[{}][{}] Failed to init transaction pending ack handler.
It will be retried in {} Ms",
+ persistentSubscription.getTopic().getName(), subName,
retryTime, t);
+ transactionOpTimer.newTimeout((timeout) -> init(), retryTime,
TimeUnit.MILLISECONDS);
+ return;
+ }
+ log.error("PendingAckHandleImpl init fail! TopicName : {}, SubName:
{}", topicName, subName, t);
+ handleCacheRequest();
+ changeToErrorState();
+ this.pendingAckStoreFuture.completeExceptionally(new
TransactionPendingAckException
+
.TransactionPendingAckStoreInitException(String.format("PendingAckHandleImpl
init fail! TopicName : "
Review Comment:
We already defined the `TransactionPendingAckStoreInitException`. The
message of the exception should be provided by the
`TransactionPendingAckStoreInitException`. And I don't think we need to add the
topic name, or subscription name here. The caller knows which topic and
subscription they are requesting.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java:
##########
@@ -945,12 +955,35 @@ public void completeHandleFuture() {
}
public void exceptionHandleFuture(Throwable t) {
+ if(isRetryableException(t)) {
+ long retryTime = backoff.next();
+ log.warn("[{}][{}] Failed to init transaction pending ack handler.
It will be retried in {} Ms",
+ persistentSubscription.getTopic().getName(), subName,
retryTime, t);
+ transactionOpTimer.newTimeout((timeout) -> init(), retryTime,
TimeUnit.MILLISECONDS);
+ return;
+ }
+ log.error("PendingAckHandleImpl init fail! TopicName : {}, SubName:
{}", topicName, subName, t);
Review Comment:
Please follow the same
[format](https://github.com/apache/pulsar/pull/21274/files#diff-26056e00d43a29a9ff02a53654d692aef03e971fcec6c4f57b546afcfff2250dR960)
instead of arbitrarily define the log format.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java:
##########
@@ -945,12 +955,35 @@ public void completeHandleFuture() {
}
public void exceptionHandleFuture(Throwable t) {
+ if(isRetryableException(t)) {
+ long retryTime = backoff.next();
+ log.warn("[{}][{}] Failed to init transaction pending ack handler.
It will be retried in {} Ms",
+ persistentSubscription.getTopic().getName(), subName,
retryTime, t);
+ transactionOpTimer.newTimeout((timeout) -> init(), retryTime,
TimeUnit.MILLISECONDS);
+ return;
+ }
+ log.error("PendingAckHandleImpl init fail! TopicName : {}, SubName:
{}", topicName, subName, t);
+ handleCacheRequest();
+ changeToErrorState();
+ this.pendingAckStoreFuture.completeExceptionally(new
TransactionPendingAckException
+
.TransactionPendingAckStoreInitException(String.format("PendingAckHandleImpl
init fail! TopicName : "
+ + "%s, SubName: %s", topicName, subName), t));
final boolean completedNow =
this.pendingAckHandleCompletableFuture.completeExceptionally(t);
if (completedNow) {
recoverTime.setRecoverEndTime(System.currentTimeMillis());
}
}
+ private static boolean isRetryableException(Throwable ex) {
+ Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+ return (realCause instanceof ManagedLedgerException
+ || realCause instanceof
PulsarClientException.BrokerPersistenceException
+ || realCause instanceof PulsarClientException.LookupException
+ || realCause instanceof PulsarClientException.ConnectException)
+ && !(realCause instanceof
ManagedLedgerException.ManagedLedgerFencedException)
+ && !(realCause instanceof
ManagedLedgerException.NonRecoverableLedgerException);
Review Comment:
It's really hard to read
I think you want to say:
```java
return (realCause instanceof ManagedLedgerException && !(realCause
instanceof ManagedLedgerException.ManagedLedgerFencedException) && !(realCause
instanceof ManagedLedgerException.NonRecoverableLedgerException))
|| realCause instanceof
PulsarClientException.BrokerPersistenceException
|| realCause instanceof PulsarClientException.LookupException
|| realCause instanceof
PulsarClientException.ConnectException;
```
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java:
##########
@@ -99,6 +104,52 @@ protected void cleanup() {
super.internalCleanup();
}
+ @Test
+ public void testUnloadSubscriptionWhenFailedInitPendingAck() throws
Exception {
+ String topic = NAMESPACE1 +
"/testUnloadSubscriptionWhenFailedInitPendingAck";
+ Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+ .subscriptionName("subName1")
+ .topic(topic)
+ .subscribe();
+ // Fail at transactionPendingAckStoreProvider::checkInitializedBefore.
+ Field transactionPendingAckStoreProviderField = PulsarService.class
+ .getDeclaredField("transactionPendingAckStoreProvider");
+ transactionPendingAckStoreProviderField.setAccessible(true);
+ TransactionPendingAckStoreProvider pendingAckStoreProvider =
+ (TransactionPendingAckStoreProvider)
transactionPendingAckStoreProviderField
+ .get(pulsarServiceList.get(0));
+ TransactionPendingAckStoreProvider mockProvider =
mock(pendingAckStoreProvider.getClass());
+ // Test retryable exception.
+ when(mockProvider.checkInitializedBefore(any()))
+ .thenReturn(FutureUtil.failedFuture(new
ManagedLedgerException("mock fail")))
+ .thenReturn(CompletableFuture.completedFuture(false));
+ transactionPendingAckStoreProviderField.set(pulsarServiceList.get(0),
mockProvider);
+ Awaitility.await().until(() -> {
+ pulsarClient.newConsumer()
+ .subscriptionName("subName2")
+ .topic(topic)
+ .subscribe();
+ return true;
+ });
Review Comment:
How the test can be passed? The retry logic will also call
`pendingAckStoreProvider.checkInitializedBefore`; it should consistently fail
no matter how many times you retry.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]