This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1129814 Fix TopicPoliciesCacheNotInitException issue. (#12773)
1129814 is described below
commit 11298144ac118cda951deffa092ab17110d254b7
Author: Jiwei Guo <[email protected]>
AuthorDate: Thu Nov 18 13:44:42 2021 +0800
Fix TopicPoliciesCacheNotInitException issue. (#12773)
### Motivation
Sometimes, we may get `TopicPoliciesCacheNotInitException` with below stack
trace:
```
15:45:47.020 [pulsar-web-41-3] INFO org.eclipse.jetty.server.RequestLog -
10.0.0.42 - - [10/Nov/2021:15:45:47 +0000] "GET /status.html HTTP/1.1" 200 2
"-" "kube-probe/1.19+" 1
15:45:51.221 [pulsar-2-15] ERROR
org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - [null] Failed to
perform getRetention on topic persistent://public/default/UpdateNodeCharts
java.lang.RuntimeException:
org.apache.pulsar.broker.service.BrokerServiceException$TopicPoliciesCacheNotInitException:
Topic policies cache have not init.
at
org.apache.pulsar.broker.service.TopicPoliciesService.lambda$getTopicPoliciesAsyncWithRetry$0(TopicPoliciesService.java:84)
~[io.streamnative-pulsar-broker-2.8.1.21.jar:2.8.1.21]
at
org.apache.pulsar.client.util.RetryUtil.executeWithRetry(RetryUtil.java:50)
~[io.streamnative-pulsar-client-original-2.8.1.21.jar:2.8.1.21]
at
org.apache.pulsar.client.util.RetryUtil.lambda$executeWithRetry$1(RetryUtil.java:63)
~[io.streamnative-pulsar-client-original-2.8.1.21.jar:2.8.1.21]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
[?:?]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[?:?]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
[?:?]
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
[io.netty-netty-common-4.1.68.Final.jar:4.1.68.Final]
at java.lang.Thread.run(Thread.java:829) [?:?]
```
This is because :
https://github.com/apache/pulsar/blob/c3da1452a444c9599cb85562a3faa82ddfdecec8/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java#L298-L312
when `reader.readNextAsync()` throws exceptions, the msg will be null which
will throw NPE without any catch block.
---
.../broker/TransactionMetadataStoreService.java | 5 ++-
.../SystemTopicBasedTopicPoliciesService.java | 42 ++++++++++++++--------
.../SystemTopicBasedTopicPoliciesServiceTest.java | 40 +++++++++++++++++++++
.../coordinator/impl/MLTransactionLogImpl.java | 8 +++--
4 files changed, 76 insertions(+), 19 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index 607f05e..240c6c9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker;
+import static
org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.getMLTransactionLogName;
import static
org.apache.pulsar.transaction.coordinator.proto.TxnStatus.ABORTING;
import static
org.apache.pulsar.transaction.coordinator.proto.TxnStatus.COMMITTING;
import com.google.common.annotations.VisibleForTesting;
@@ -63,7 +64,6 @@ import org.apache.pulsar.transaction.coordinator.TxnMeta;
import
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.CoordinatorNotFoundException;
import
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.InvalidTxnStatusException;
import
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.TransactionMetadataStoreStateException;
-import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -230,8 +230,7 @@ public class TransactionMetadataStoreService {
public CompletableFuture<TransactionMetadataStore>
openTransactionMetadataStore(TransactionCoordinatorID tcId) {
return pulsarService.getBrokerService()
- .getManagedLedgerConfig(TopicName.get(MLTransactionLogImpl
- .TRANSACTION_LOG_PREFIX + tcId)).thenCompose(v -> {
+
.getManagedLedgerConfig(getMLTransactionLogName(tcId)).thenCompose(v -> {
TransactionTimeoutTracker timeoutTracker =
timeoutTrackerFactory.newTracker(tcId);
TransactionRecoverTracker recoverTracker =
new
TransactionRecoverTrackerImpl(TransactionMetadataStoreService.this,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 07fe239..d0c3d49 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -171,6 +171,10 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
@Override
public TopicPolicies getTopicPolicies(TopicName topicName) throws
TopicPoliciesCacheNotInitException {
+ if (!policyCacheInitMap.containsKey(topicName.getNamespaceObject())) {
+ NamespaceName namespace = topicName.getNamespaceObject();
+ prepareInitPoliciesCache(namespace, new CompletableFuture<>());
+ }
if (policyCacheInitMap.containsKey(topicName.getNamespaceObject())
&& !policyCacheInitMap.get(topicName.getNamespaceObject())) {
throw new TopicPoliciesCacheNotInitException();
@@ -209,24 +213,29 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
result.complete(null);
} else {
ownedBundlesCountPerNamespace.putIfAbsent(namespace, new
AtomicInteger(1));
- policyCacheInitMap.put(namespace, false);
- CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
readerCompletableFuture =
- creatSystemTopicClientWithRetry(namespace);
- readerCaches.put(namespace, readerCompletableFuture);
- readerCompletableFuture.whenComplete((reader, ex) -> {
- if (ex != null) {
- log.error("[{}] Failed to create reader on
__change_events topic", namespace, ex);
- result.completeExceptionally(ex);
- } else {
- initPolicesCache(reader, result);
- result.thenRun(() -> readMorePolicies(reader));
- }
- });
+ prepareInitPoliciesCache(namespace, result);
}
}
return result;
}
+ private void prepareInitPoliciesCache(NamespaceName namespace,
CompletableFuture<Void> result) {
+ if (policyCacheInitMap.putIfAbsent(namespace, false) == null) {
+ CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
readerCompletableFuture =
+ creatSystemTopicClientWithRetry(namespace);
+ readerCaches.put(namespace, readerCompletableFuture);
+ readerCompletableFuture.whenComplete((reader, ex) -> {
+ if (ex != null) {
+ log.error("[{}] Failed to create reader on __change_events
topic", namespace, ex);
+ result.completeExceptionally(ex);
+ } else {
+ initPolicesCache(reader, result);
+ result.thenRun(() -> readMorePolicies(reader));
+ }
+ });
+ }
+ }
+
protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
creatSystemTopicClientWithRetry(
NamespaceName namespace) {
SystemTopicClient<PulsarEvent> systemTopicClient =
namespaceEventsSystemTopicFactory
@@ -294,6 +303,9 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
reader.getSystemTopic().getTopicName(), ex);
future.completeExceptionally(ex);
readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
+
policyCacheInitMap.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
+ reader.closeAsync();
+ return;
}
if (hasMore) {
reader.readNextAsync().whenComplete((msg, e) -> {
@@ -302,6 +314,9 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
reader.getSystemTopic().getTopicName(), ex);
future.completeExceptionally(e);
readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
+
policyCacheInitMap.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
+ reader.closeAsync();
+ return;
}
refreshTopicPoliciesCache(msg);
if (log.isDebugEnabled()) {
@@ -316,7 +331,6 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
}
policyCacheInitMap.computeIfPresent(
reader.getSystemTopic().getTopicName().getNamespaceObject(), (k, v) -> true);
-
// replay policy message
policiesCache.forEach(((topicName, topicPolicies) -> {
if (listeners.get(topicName) != null) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index 532bd33..4c43492 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -31,8 +31,12 @@ import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import
org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
@@ -276,4 +280,40 @@ public class SystemTopicBasedTopicPoliciesServiceTest
extends MockedPulsarServic
assertEquals(reader1, reader);
}
+
+ @Test
+ public void testGetTopicPoliciesWithRetry() throws Exception {
+ Field initMapField =
SystemTopicBasedTopicPoliciesService.class.getDeclaredField("policyCacheInitMap");
+ initMapField.setAccessible(true);
+ Map<NamespaceName, Boolean> initMap =
(Map)initMapField.get(systemTopicBasedTopicPoliciesService);
+ initMap.remove(NamespaceName.get(NAMESPACE1));
+ Field readerCaches =
SystemTopicBasedTopicPoliciesService.class.getDeclaredField("readerCaches");
+ readerCaches.setAccessible(true);
+ Map<NamespaceName,
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>> readers =
(Map)readerCaches.get(systemTopicBasedTopicPoliciesService);
+ readers.remove(NamespaceName.get(NAMESPACE1));
+ Backoff backoff = new BackoffBuilder()
+ .setInitialTime(500, TimeUnit.MILLISECONDS)
+ .setMandatoryStop(5000, TimeUnit.MILLISECONDS)
+ .setMax(1000, TimeUnit.MILLISECONDS)
+ .create();
+ TopicPolicies initPolicy = TopicPolicies.builder()
+ .maxConsumerPerTopic(10)
+ .build();
+ ScheduledExecutorService executors =
Executors.newScheduledThreadPool(1);
+ executors.schedule(new Runnable() {
+ @Override
+ public void run() {
+ try {
+
systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1,
initPolicy).get();
+ } catch (Exception ignore) {}
+ }
+ }, 2000, TimeUnit.MILLISECONDS);
+ Awaitility.await().untilAsserted(() -> {
+ Optional<TopicPolicies> topicPolicies =
systemTopicBasedTopicPoliciesService.getTopicPoliciesAsyncWithRetry(TOPIC1,
backoff, pulsar.getExecutor()).get();
+ Assert.assertTrue(topicPolicies.isPresent());
+ if (topicPolicies.isPresent()) {
+ Assert.assertEquals(topicPolicies.get(), initPolicy);
+ }
+ });
+ }
}
diff --git
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
index f2324af..c044275 100644
---
a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
+++
b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
@@ -73,8 +73,7 @@ public class MLTransactionLogImpl implements TransactionLog {
public MLTransactionLogImpl(TransactionCoordinatorID tcID,
ManagedLedgerFactory managedLedgerFactory,
ManagedLedgerConfig managedLedgerConfig) {
- this.topicName = TopicName.get(TopicDomain.persistent.value(),
- NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_LOG_PREFIX +
tcID.getId());
+ this.topicName = getMLTransactionLogName(tcID);
this.tcId = tcID.getId();
this.mlTransactionLogInterceptor = new MLTransactionLogInterceptor();
managedLedgerConfig.setManagedLedgerInterceptor(this.mlTransactionLogInterceptor);
@@ -83,6 +82,11 @@ public class MLTransactionLogImpl implements TransactionLog {
this.entryQueue = new SpscArrayQueue<>(2000);
}
+ public static TopicName getMLTransactionLogName(TransactionCoordinatorID
tcID) {
+ return TopicName.get(TopicDomain.persistent.value(),
+ NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_LOG_PREFIX +
tcID.getId());
+ }
+
@Override
public CompletableFuture<Void> initialize() {
CompletableFuture<Void> future = new CompletableFuture<>();