This is an automated email from the ASF dual-hosted git repository.
jianghaiting pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 32fe2288544 [Branch-2.7] Fixed deadlock on metadata cache missing
while doing checkReplication (#12484)
32fe2288544 is described below
commit 32fe228854464504d18de240f719b583cf262042
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Jul 27 23:45:03 2022 -0700
[Branch-2.7] Fixed deadlock on metadata cache missing while doing
checkReplication (#12484)
---
.../pulsar/broker/service/BrokerService.java | 210 +++++++++++----------
.../broker/service/persistent/PersistentTopic.java | 109 ++++++-----
2 files changed, 172 insertions(+), 147 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 29d1001e4bd..40fa540b9c0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1190,119 +1190,139 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
Optional<Policies> policies = Optional.empty();
Optional<LocalPolicies> localPolicies = Optional.empty();
- PersistencePolicies persistencePolicies = null;
- RetentionPolicies retentionPolicies = null;
- OffloadPolicies topicLevelOffloadPolicies = null;
+ PersistencePolicies tmpPersistencePolicies = null;
+ RetentionPolicies tmpRetentionPolicies = null;
+ OffloadPolicies tmpTopicLevelOffloadPolicies = null;
if (pulsar.getConfig().isTopicLevelPoliciesEnabled()) {
try {
TopicPolicies topicPolicies =
pulsar.getTopicPoliciesService().getTopicPolicies(topicName);
if (topicPolicies != null) {
- persistencePolicies = topicPolicies.getPersistence();
- retentionPolicies =
topicPolicies.getRetentionPolicies();
- topicLevelOffloadPolicies =
topicPolicies.getOffloadPolicies();
+ tmpPersistencePolicies =
topicPolicies.getPersistence();
+ tmpRetentionPolicies =
topicPolicies.getRetentionPolicies();
+ tmpTopicLevelOffloadPolicies =
topicPolicies.getOffloadPolicies();
}
} catch
(BrokerServiceException.TopicPoliciesCacheNotInitException e) {
log.debug("Topic {} policies have not been initialized
yet.", topicName);
}
}
- try {
- policies = pulsar
-
.getConfigurationCache().policiesCache().get(AdminResource.path(POLICIES,
- namespace.toString()));
- String path = joinPath(LOCAL_POLICIES_ROOT,
topicName.getNamespaceObject().toString());
- localPolicies =
pulsar().getLocalZkCacheService().policiesCache().get(path);
- } catch (Throwable t) {
- // Ignoring since if we don't have policies, we fallback on
the default
- log.warn("Got exception when reading persistence policy for
{}: {}", topicName, t.getMessage(), t);
- future.completeExceptionally(t);
- return;
- }
-
- if (persistencePolicies == null) {
- persistencePolicies = policies.map(p ->
p.persistence).orElseGet(
- () -> new
PersistencePolicies(serviceConfig.getManagedLedgerDefaultEnsembleSize(),
-
serviceConfig.getManagedLedgerDefaultWriteQuorum(),
-
serviceConfig.getManagedLedgerDefaultAckQuorum(),
-
serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit()));
- }
+ final PersistencePolicies finalPersistencePolicies =
tmpPersistencePolicies;
+ final RetentionPolicies finalRetentionPolicies =
tmpRetentionPolicies;
+ final OffloadPolicies finalTopicLevelOffloadPolicies =
tmpTopicLevelOffloadPolicies;
+
+
+ CompletableFuture<Optional<Policies>> policiesFuture = pulsar
+
.getConfigurationCache().policiesCache().getAsync(AdminResource.path(POLICIES,
+ namespace.toString()));
+ String path = joinPath(LOCAL_POLICIES_ROOT,
topicName.getNamespaceObject().toString());
+ CompletableFuture<Optional<LocalPolicies>> localPoliciesFuture =
+
pulsar().getLocalZkCacheService().policiesCache().getAsync(path);
+
+ policiesFuture.thenCombine(localPoliciesFuture, (optPolicies,
optLocalPolicies) -> {
+ PersistencePolicies persistencePolicies =
finalPersistencePolicies;
+ RetentionPolicies retentionPolicies = finalRetentionPolicies;
+ OffloadPolicies topicLevelOffloadPolicies =
finalTopicLevelOffloadPolicies;
+
+ if (persistencePolicies == null) {
+ persistencePolicies = policies.map(p ->
p.persistence).orElseGet(
+ () -> new
PersistencePolicies(serviceConfig.getManagedLedgerDefaultEnsembleSize(),
+
serviceConfig.getManagedLedgerDefaultWriteQuorum(),
+
serviceConfig.getManagedLedgerDefaultAckQuorum(),
+
serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit()));
+ }
- if (retentionPolicies == null) {
- retentionPolicies = policies.map(p ->
p.retention_policies).orElseGet(
- () -> new
RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
- serviceConfig.getDefaultRetentionSizeInMB())
- );
- }
+ if (retentionPolicies == null) {
+ retentionPolicies = policies.map(p ->
p.retention_policies).orElseGet(
+ () -> new
RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
+
serviceConfig.getDefaultRetentionSizeInMB())
+ );
+ }
- ManagedLedgerConfig managedLedgerConfig = new
ManagedLedgerConfig();
-
managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
-
managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
-
managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
- if (localPolicies.isPresent() &&
localPolicies.get().bookieAffinityGroup != null) {
+ ManagedLedgerConfig managedLedgerConfig = new
ManagedLedgerConfig();
+
managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
+
managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
+
managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
+ if (localPolicies.isPresent() &&
localPolicies.get().bookieAffinityGroup != null) {
+ managedLedgerConfig
+ .setBookKeeperEnsemblePlacementPolicyClassName(
+
ZkIsolatedBookieEnsemblePlacementPolicy.class);
+ Map<String, Object> properties = Maps.newHashMap();
+
properties.put(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
+
localPolicies.get().bookieAffinityGroup.bookkeeperAffinityGroupPrimary);
+
properties.put(ZkIsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
+
localPolicies.get().bookieAffinityGroup.bookkeeperAffinityGroupSecondary);
+
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
+ }
+
managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate());
+
managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType());
+
managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword());
+
+ managedLedgerConfig.setMaxUnackedRangesToPersist(
+
serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
+ managedLedgerConfig.setMaxUnackedRangesToPersistInZk(
+
serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInZooKeeper());
+
managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger());
+
managedLedgerConfig.setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(),
+ TimeUnit.MINUTES);
+
managedLedgerConfig.setMaximumRolloverTime(serviceConfig.getManagedLedgerMaxLedgerRolloverTimeMinutes(),
+ TimeUnit.MINUTES);
+
managedLedgerConfig.setMaxSizePerLedgerMb(serviceConfig.getManagedLedgerMaxSizePerLedgerMbytes());
+
+ managedLedgerConfig.setMetadataOperationsTimeoutSeconds(
+
serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds());
+
managedLedgerConfig.setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds());
+
managedLedgerConfig.setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds());
+
managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize());
+ managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled(
+
serviceConfig.isManagedLedgerUnackedRangesOpenCacheSetEnabled());
+
managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum());
+
managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum());
managedLedgerConfig
-
.setBookKeeperEnsemblePlacementPolicyClassName(ZkIsolatedBookieEnsemblePlacementPolicy.class);
- Map<String, Object> properties = Maps.newHashMap();
-
properties.put(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
-
localPolicies.get().bookieAffinityGroup.bookkeeperAffinityGroupPrimary);
-
properties.put(ZkIsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
-
localPolicies.get().bookieAffinityGroup.bookkeeperAffinityGroupSecondary);
-
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
- }
-
managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate());
-
managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType());
-
managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword());
-
-
managedLedgerConfig.setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
-
managedLedgerConfig.setMaxUnackedRangesToPersistInZk(serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInZooKeeper());
-
managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger());
-
managedLedgerConfig.setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(),
- TimeUnit.MINUTES);
-
managedLedgerConfig.setMaximumRolloverTime(serviceConfig.getManagedLedgerMaxLedgerRolloverTimeMinutes(),
- TimeUnit.MINUTES);
-
managedLedgerConfig.setMaxSizePerLedgerMb(serviceConfig.getManagedLedgerMaxSizePerLedgerMbytes());
-
- managedLedgerConfig.setMetadataOperationsTimeoutSeconds(
-
serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds());
-
managedLedgerConfig.setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds());
-
managedLedgerConfig.setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds());
-
managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize());
- managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled(
-
serviceConfig.isManagedLedgerUnackedRangesOpenCacheSetEnabled());
-
managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum());
-
managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum());
- managedLedgerConfig
-
.setMetadataMaxEntriesPerLedger(serviceConfig.getManagedLedgerCursorMaxEntriesPerLedger());
-
-
managedLedgerConfig.setLedgerRolloverTimeout(serviceConfig.getManagedLedgerCursorRolloverTimeInSeconds());
-
managedLedgerConfig.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(),
TimeUnit.MINUTES);
-
managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
-
managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
-
managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
-
- OffloadPolicies nsLevelOffloadPolicies = policies.map(p ->
p.offload_policies).orElse(null);
- OffloadPolicies offloadPolicies =
OffloadPolicies.mergeConfiguration(
- topicLevelOffloadPolicies,
-
OffloadPolicies.oldPoliciesCompatible(nsLevelOffloadPolicies,
policies.orElse(null)),
- getPulsar().getConfig().getProperties());
- if (topicLevelOffloadPolicies != null) {
- try {
- LedgerOffloader topicLevelLedgerOffLoader =
pulsar().createManagedLedgerOffloader(offloadPolicies);
-
managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
- } catch (PulsarServerException e) {
- future.completeExceptionally(e);
- return;
+
.setMetadataMaxEntriesPerLedger(serviceConfig.getManagedLedgerCursorMaxEntriesPerLedger());
+
+ managedLedgerConfig.setLedgerRolloverTimeout(
+
serviceConfig.getManagedLedgerCursorRolloverTimeInSeconds());
+
managedLedgerConfig.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(),
TimeUnit.MINUTES);
+
managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
+
managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
+
managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
+
+ OffloadPolicies nsLevelOffloadPolicies = policies.map(p ->
p.offload_policies).orElse(null);
+ OffloadPolicies offloadPolicies =
OffloadPolicies.mergeConfiguration(
+ topicLevelOffloadPolicies,
+
OffloadPolicies.oldPoliciesCompatible(nsLevelOffloadPolicies,
policies.orElse(null)),
+ getPulsar().getConfig().getProperties());
+ if (topicLevelOffloadPolicies != null) {
+ try {
+ LedgerOffloader topicLevelLedgerOffLoader =
+
pulsar().createManagedLedgerOffloader(offloadPolicies);
+
managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
+ } catch (PulsarServerException e) {
+ future.completeExceptionally(e);
+ return null;
+ }
+ } else {
+ //If the topic level policy is null, use the namespace
level
+ managedLedgerConfig.setLedgerOffloader(
+ pulsar.getManagedLedgerOffloader(namespace,
offloadPolicies));
}
- } else {
- //If the topic level policy is null, use the namespace level
-
managedLedgerConfig.setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace,
offloadPolicies));
- }
-
managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled());
-
managedLedgerConfig.setNewEntriesCheckDelayInMillis(serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis());
+ managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(
+
serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled());
+ managedLedgerConfig.setNewEntriesCheckDelayInMillis(
+
serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis());
+
+
+ future.complete(managedLedgerConfig);
+ return null;
+ }).exceptionally(ex -> {
+ log.warn("Got exception when reading persistence policy for
{}: {}", topicName, ex.getMessage(), ex);
+ future.completeExceptionally(ex);
+ return null;
+ });
- future.complete(managedLedgerConfig);
}, (exception) -> future.completeExceptionally(exception)));
return future;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 67f17d4a4c3..c2fd32a44fd 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -157,6 +157,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
@SuppressWarnings("unused")
private volatile long usageCount = 0;
+
static final String DEDUPLICATION_CURSOR_NAME = "pulsar.dedup";
private static final double MESSAGE_EXPIRY_THRESHOLD = 1.5;
@@ -1155,68 +1156,72 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
log.debug("[{}] Checking replication status", name);
}
- Policies policies = null;
- try {
- policies =
brokerService.pulsar().getConfigurationCache().policiesCache()
- .get(AdminResource.path(POLICIES, name.getNamespace()))
- .orElseThrow(() -> new KeeperException.NoNodeException());
- } catch (Exception e) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- future.completeExceptionally(new ServerMetadataException(e));
- return future;
- }
- //Ignore current broker's config for messageTTL for replication.
- final int newMessageTTLinSeconds;
- try {
- newMessageTTLinSeconds = getMessageTTL();
- } catch (Exception e) {
- return FutureUtil.failedFuture(new ServerMetadataException(e));
- }
+ return brokerService.pulsar().getConfigurationCache().policiesCache()
+ .getAsync(AdminResource.path(POLICIES, name.getNamespace()))
+ .thenCompose(optPolicies -> {
+ if (!optPolicies.isPresent()) {
+ return FutureUtil.failedFuture(
+ new ServerMetadataException("Namespace not
found: " + name.getNamespace()));
+ }
- Set<String> configuredClusters;
- if (policies.replication_clusters != null) {
- configuredClusters =
Sets.newTreeSet(policies.replication_clusters);
- } else {
- configuredClusters = Collections.emptySet();
- }
+ Policies policies = optPolicies.get();
- String localCluster =
brokerService.pulsar().getConfiguration().getClusterName();
+ //Ignore current broker's config for messageTTL for
replication.
+ final int newMessageTTLinSeconds;
+ try {
+ newMessageTTLinSeconds = getMessageTTL();
+ } catch (Exception e) {
+ return FutureUtil.failedFuture(new
ServerMetadataException(e));
+ }
- // if local cluster is removed from global namespace cluster-list :
then delete topic forcefully because pulsar
- // doesn't serve global topic without local repl-cluster configured.
- if (TopicName.get(topic).isGlobal() &&
!configuredClusters.contains(localCluster)) {
- log.info("Deleting topic [{}] because local cluster is not part of
global namespace repl list {}",
- topic, configuredClusters);
- return deleteForcefully();
- }
+ Set<String> configuredClusters;
+ if (policies.replication_clusters != null) {
+ configuredClusters =
Sets.newTreeSet(policies.replication_clusters);
+ } else {
+ configuredClusters = Collections.emptySet();
+ }
- List<CompletableFuture<Void>> futures = Lists.newArrayList();
+ String localCluster =
brokerService.pulsar().getConfiguration().getClusterName();
+
+ // if local cluster is removed from global namespace
cluster-list : then delete topic forcefully
+ // because pulsar
+ // doesn't serve global topic without local repl-cluster
configured.
+ if (TopicName.get(topic).isGlobal() &&
!configuredClusters.contains(localCluster)) {
+ log.info(
+ "Deleting topic [{}] because local cluster is
not part of global namespace repl list "
+ + "{}",
+ topic, configuredClusters);
+ return deleteForcefully();
+ }
- // Check for missing replicators
- for (String cluster : configuredClusters) {
- if (cluster.equals(localCluster)) {
- continue;
- }
+ List<CompletableFuture<Void>> futures =
Lists.newArrayList();
- if (!replicators.containsKey(cluster)) {
- futures.add(startReplicator(cluster));
- }
- }
+ // Check for missing replicators
+ for (String cluster : configuredClusters) {
+ if (cluster.equals(localCluster)) {
+ continue;
+ }
- // Check for replicators to be stopped
- replicators.forEach((cluster, replicator) -> {
- // Update message TTL
- ((PersistentReplicator)
replicator).updateMessageTTL(newMessageTTLinSeconds);
+ if (!replicators.containsKey(cluster)) {
+ futures.add(startReplicator(cluster));
+ }
+ }
- if (!cluster.equals(localCluster)) {
- if (!configuredClusters.contains(cluster)) {
- futures.add(removeReplicator(cluster));
- }
- }
+ // Check for replicators to be stopped
+ replicators.forEach((cluster, replicator) -> {
+ // Update message TTL
+ ((PersistentReplicator)
replicator).updateMessageTTL(newMessageTTLinSeconds);
- });
+ if (!cluster.equals(localCluster)) {
+ if (!configuredClusters.contains(cluster)) {
+ futures.add(removeReplicator(cluster));
+ }
+ }
- return FutureUtil.waitForAll(futures);
+ });
+
+ return FutureUtil.waitForAll(futures);
+ });
}
@Override