This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit be4f9a7f251bbfa34951b0eceb7624f7774ac771
Author: Xiangying Meng <[email protected]>
AuthorDate: Tue Aug 10 22:24:15 2021 +0800

    Pending ack set managed ledger config true (#11494)
    
    ## Modivation
    Fix the Issue of https://github.com/apache/pulsar/issues/11481
    In standalone mode, pulsar 2.8.0 cannot be used normally when the 
transaction is started
    
    ## CauseBy
    ```getTopic```was executed  twice when FunctionWorkService .
    ```getTopicIfExists```make ```createIfMissing = false``` When the execution 
ends.
    ```PersistentSubscription```  will create a ledger for the subscription  
when transaction was turned on.
    ```new MetadataNotFoundException("Managed ledger not found")```was thrown 
when calling ```MetaStoreImpl::getManagedLedgerInfo```
    ## implement
    Create a separate ManagerLedgerConfig for PendingAck
    ## verify
    Add testSubscriptionRecreateTopic in TransactionTest
    
    (cherry picked from commit daf457db611397f2e699880e51ad584aff285bfa)
---
 .../pendingack/impl/MLPendingAckStoreProvider.java | 85 +++++++++++++-------
 .../pulsar/broker/transaction/TransactionTest.java | 93 ++++++++++++++++++++--
 .../broker/transaction/TransactionTestBase.java    |  1 +
 3 files changed, 144 insertions(+), 35 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
index 0741f06..4dbee19 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
@@ -32,6 +32,7 @@ import 
org.apache.pulsar.broker.transaction.pendingack.exceptions.TransactionPen
 import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.naming.TopicName;
 
+
 /**
  * Provider is for MLPendingAckStore.
  */
@@ -47,40 +48,68 @@ public class MLPendingAckStoreProvider implements 
TransactionPendingAckStoreProv
                     new TransactionPendingAckStoreProviderException("The 
subscription is null."));
             return pendingAckStoreFuture;
         }
-
         PersistentTopic originPersistentTopic = (PersistentTopic) 
subscription.getTopic();
         String pendingAckTopicName = MLPendingAckStore
                 
.getTransactionPendingAckStoreSuffix(originPersistentTopic.getName(), 
subscription.getName());
-
         originPersistentTopic.getBrokerService().getManagedLedgerFactory()
-                
.asyncOpen(TopicName.get(pendingAckTopicName).getPersistenceNamingEncoding(),
-                        originPersistentTopic.getManagedLedger().getConfig(),
-                        new AsyncCallbacks.OpenLedgerCallback() {
-                            @Override
-                            public void openLedgerComplete(ManagedLedger 
ledger, Object ctx) {
-                                
ledger.asyncOpenCursor(MLPendingAckStore.getTransactionPendingAckStoreCursorName(),
-                                        InitialPosition.Earliest, new 
AsyncCallbacks.OpenCursorCallback() {
-                                            @Override
-                                            public void 
openCursorComplete(ManagedCursor cursor, Object ctx) {
-                                                pendingAckStoreFuture
-                                                        .complete(new 
MLPendingAckStore(ledger, cursor,
-                                                                
subscription.getCursor()));
-                                            }
+                .asyncExists(TopicName.get(pendingAckTopicName)
+                        .getPersistenceNamingEncoding()).thenAccept(exist -> {
+            TopicName topicName;
+            if (exist) {
+                topicName = TopicName.get(pendingAckTopicName);
+            } else {
+                topicName = TopicName.get(originPersistentTopic.getName());
+            }
+            originPersistentTopic.getBrokerService()
+                    .getManagedLedgerConfig(topicName).thenAccept(config -> {
+                config.setCreateIfMissing(true);
+                
originPersistentTopic.getBrokerService().getManagedLedgerFactory()
+                        
.asyncOpen(TopicName.get(pendingAckTopicName).getPersistenceNamingEncoding(),
+                                config, new 
AsyncCallbacks.OpenLedgerCallback() {
+                                    @Override
+                                    public void 
openLedgerComplete(ManagedLedger ledger, Object ctx) {
+                                        ledger.asyncOpenCursor(
+                                                
MLPendingAckStore.getTransactionPendingAckStoreCursorName(),
+                                                InitialPosition.Earliest, new 
AsyncCallbacks.OpenCursorCallback() {
+                                                    @Override
+                                                    public void 
openCursorComplete(ManagedCursor cursor, Object ctx) {
+                                                        pendingAckStoreFuture
+                                                                .complete(new 
MLPendingAckStore(ledger, cursor,
+                                                                        
subscription.getCursor()));
+                                                        if 
(log.isDebugEnabled()) {
+                                                            log.debug("{},{} 
open MLPendingAckStore cursor success",
+                                                                    
originPersistentTopic.getName(),
+                                                                    
subscription.getName());
+                                                        }
+                                                    }
 
-                                            @Override
-                                            public void 
openCursorFailed(ManagedLedgerException exception, Object ctx) {
-                                                log.error("Open 
MLPendingAckStore cursor failed.", exception);
-                                                
pendingAckStoreFuture.completeExceptionally(exception);
-                                            }
-                                        }, null);
-                            }
+                                                    @Override
+                                                    public void 
openCursorFailed(ManagedLedgerException exception,
+                                                                               
  Object ctx) {
+                                                        log.error("{},{} open 
MLPendingAckStore cursor failed."
+                                                                , 
originPersistentTopic.getName(),
+                                                                
subscription.getName(), exception);
+                                                        
pendingAckStoreFuture.completeExceptionally(exception);
+                                                    }
+                                                }, null);
+                                    }
 
-                            @Override
-                            public void 
openLedgerFailed(ManagedLedgerException exception, Object ctx) {
-                                log.error("Open MLPendingAckStore 
managedLedger failed.", exception);
-                                
pendingAckStoreFuture.completeExceptionally(exception);
-                            }
-                        }, () -> true, null);
+                                    @Override
+                                    public void 
openLedgerFailed(ManagedLedgerException exception, Object ctx) {
+                                        log.error("{}, {} open 
MLPendingAckStore managedLedger failed."
+                                                , 
originPersistentTopic.getName(), subscription.getName(), exception);
+                                        
pendingAckStoreFuture.completeExceptionally(exception);
+                                    }
+                                }, () -> true, null);
+            });
+        }).exceptionally(e -> {
+            log.error("Failed to obtain the existence of ManagerLedger with 
topic and subscription : "
+                    + originPersistentTopic.getSubscriptions() + "  "
+                    + subscription.getName());
+            pendingAckStoreFuture.completeExceptionally(
+                    e.getCause());
+            return null;
+        });
         return pendingAckStoreFuture;
     }
 }
\ No newline at end of file
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 26a0260..a319c5d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -19,18 +19,26 @@
 package org.apache.pulsar.broker.transaction;
 
 import static 
org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX;
-import static org.testng.AssertJUnit.assertEquals;
-import static org.testng.AssertJUnit.assertNotNull;
 import com.google.common.collect.Sets;
+import java.lang.reflect.Field;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
 import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
+import 
org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
@@ -40,12 +48,10 @@ import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
-import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
-import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
-import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
-import 
org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -167,4 +173,77 @@ public class TransactionTest extends TransactionTestBase {
         Assert.assertEquals(txnID.getLeastSigBits(), 1);
         Assert.assertEquals(txnID.getMostSigBits(), 0);
     }
+
+    @Test
+    public void testSubscriptionRecreateTopic()
+            throws PulsarAdminException, NoSuchFieldException, 
IllegalAccessException, PulsarClientException {
+        String topic = "persistent://pulsar/system/testReCreateTopic";
+        String subName = "sub_testReCreateTopic";
+        int retentionSizeInMbSetTo = 5;
+        int retentionSizeInMbSetTopic = 6;
+        int retentionSizeInMinutesSetTo = 5;
+        int retentionSizeInMinutesSetTopic = 6;
+        admin.topics().createNonPartitionedTopic(topic);
+        PulsarService pulsarService = super.getPulsarServiceList().get(0);
+        pulsarService.getBrokerService().getTopics().clear();
+        ManagedLedgerFactory managedLedgerFactory = 
pulsarService.getBrokerService().getManagedLedgerFactory();
+        Field field = 
ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
+        field.setAccessible(true);
+        ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>> 
ledgers =
+                (ConcurrentHashMap<String, 
CompletableFuture<ManagedLedgerImpl>>) field.get(managedLedgerFactory);
+        ledgers.remove(TopicName.get(topic).getPersistenceNamingEncoding());
+        try {
+            admin.topics().createNonPartitionedTopic(topic);
+            Assert.fail();
+        } catch (PulsarAdminException.ConflictException e) {
+            log.info("Cann`t create topic again");
+        }
+        admin.topics().setRetention(topic,
+                new RetentionPolicies(retentionSizeInMinutesSetTopic, 
retentionSizeInMbSetTopic));
+        pulsarClient.newConsumer().topic(topic)
+                .subscriptionName(subName)
+                .subscribe();
+        
pulsarService.getBrokerService().getTopicIfExists(topic).thenAccept(option -> {
+            if (!option.isPresent()) {
+                log.error("Failed o get Topic named: {}", topic);
+                Assert.fail();
+            }
+            PersistentTopic originPersistentTopic = (PersistentTopic) 
option.get();
+            String pendingAckTopicName = MLPendingAckStore
+                    
.getTransactionPendingAckStoreSuffix(originPersistentTopic.getName(), subName);
+
+            try {
+                admin.topics().setRetention(pendingAckTopicName,
+                        new RetentionPolicies(retentionSizeInMinutesSetTo, 
retentionSizeInMbSetTo));
+            } catch (PulsarAdminException e) {
+                log.error("Failed to get./setRetention of topic with 
Exception:" + e);
+                Assert.fail();
+            }
+            PersistentSubscription subscription = originPersistentTopic
+                    .getSubscription(subName);
+            subscription.getPendingAckManageLedger().thenAccept(managedLedger 
-> {
+                long retentionSize = 
managedLedger.getConfig().getRetentionSizeInMB();
+                if (!originPersistentTopic.getTopicPolicies().isPresent()) {
+                    log.error("Failed to getTopicPolicies of :" + 
originPersistentTopic);
+                    Assert.fail();
+                }
+                TopicPolicies topicPolicies = 
originPersistentTopic.getTopicPolicies().get();
+                Assert.assertEquals(retentionSizeInMbSetTopic, retentionSize);
+                MLPendingAckStoreProvider mlPendingAckStoreProvider = new 
MLPendingAckStoreProvider();
+                CompletableFuture<PendingAckStore> future = 
mlPendingAckStoreProvider.newPendingAckStore(subscription);
+                future.thenAccept(pendingAckStore -> {
+                            ((MLPendingAckStore) 
pendingAckStore).getManagedLedger().thenAccept(managedLedger1 -> {
+                                
Assert.assertEquals(managedLedger1.getConfig().getRetentionSizeInMB(),
+                                        retentionSizeInMbSetTo);
+                            });
+                        }
+                );
+            });
+
+
+        });
+
+
+    }
+
 }
\ No newline at end of file
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
index 67ecd87..465a7aa 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
@@ -137,6 +137,7 @@ public abstract class TransactionTestBase extends 
TestRetrySupport {
             conf.setSystemTopicEnabled(true);
             conf.setTransactionBufferSnapshotMaxTransactionCount(2);
             conf.setTransactionBufferSnapshotMinTimeInMillis(2000);
+            conf.setTopicLevelPoliciesEnabled(true);
             serviceConfigurationList.add(conf);
 
             PulsarService pulsar = spy(new PulsarService(conf));

Reply via email to