This is an automated email from the ASF dual-hosted git repository.
chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new 1dada32 [Branch-2.8]fix cherry-pick issue (#12397)
1dada32 is described below
commit 1dada32e2cf13656f433ce867e1b00b9c156957b
Author: Hang Chen <[email protected]>
AuthorDate: Thu Oct 21 16:48:00 2021 +0800
[Branch-2.8]fix cherry-pick issue (#12397)
### Motivation
Fix branch 2.8 cherry-pick issue.
---
.../apache/pulsar/broker/service/AbstractTopic.java | 14 +++++++-------
.../apache/pulsar/broker/service/BrokerService.java | 17 +++++++++--------
.../service/persistent/DispatchRateLimiter.java | 8 +++++---
.../apache/pulsar/broker/web/PulsarWebResource.java | 1 -
.../apache/pulsar/broker/service/ReplicatorTest.java | 3 ++-
.../tests/integration/topologies/PulsarCluster.java | 20 --------------------
6 files changed, 23 insertions(+), 40 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 62f8fe1..ff08aba 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -146,11 +146,11 @@ public abstract class AbstractTopic implements Topic {
Policies policies;
try {
policies =
brokerService.pulsar().getConfigurationCache().policiesCache()
- .getDataIfPresent(AdminResource.path(POLICIES,
TopicName.get(topic).getNamespace()));
+ .get(AdminResource.path(POLICIES,
TopicName.get(topic).getNamespace()))
+ .orElseGet(Policies::new);
} catch (Exception e) {
policies = new Policies();
}
-
maxProducers = policies.max_producers_per_topic;
}
maxProducers = maxProducers != null ? maxProducers :
brokerService.pulsar()
@@ -193,10 +193,12 @@ public abstract class AbstractTopic implements Topic {
// Use getDataIfPresent from zk cache to make the call
non-blocking and prevent deadlocks
policies =
brokerService.pulsar().getConfigurationCache().policiesCache()
.getDataIfPresent(AdminResource.path(POLICIES,
TopicName.get(topic).getNamespace()));
+ if (policies == null) {
+ policies = new Policies();
+ }
} catch (Exception e) {
policies = new Policies();
}
-
maxConsumers = policies.max_consumers_per_topic;
}
final int maxConsumersPerTopic = maxConsumers != null ? maxConsumers
@@ -773,10 +775,8 @@ public abstract class AbstractTopic implements Topic {
policies = optPolicies.get();
} else {
policies =
brokerService.pulsar().getConfigurationCache().policiesCache()
- .getDataIfPresent(AdminResource.path(POLICIES,
TopicName.get(topic).getNamespace()));
- if (policies == null) {
- policies = new Policies();
- }
+ .get(AdminResource.path(POLICIES,
TopicName.get(topic).getNamespace()))
+ .orElseGet(Policies::new);
}
} catch (Exception e) {
log.warn("[{}] Error getting policies {} and publish throttling
will be disabled", topic, e.getMessage());
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 98b2f8f..a0239aa 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2568,11 +2568,12 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
private AutoTopicCreationOverride getAutoTopicCreationOverride(final
TopicName topicName) {
try {
- Policies policies = pulsar.getConfigurationCache().policiesCache()
- .getDataIfPresent(AdminResource.path(POLICIES,
topicName.getNamespace()));
+ Optional<Policies> policies =
pulsar.getConfigurationCache().policiesCache()
+ .get(AdminResource.path(POLICIES, topicName.getNamespace()));
+
// If namespace policies have the field set, it will override the
broker-level setting
- if (policies != null && policies.autoTopicCreationOverride !=
null) {
- return policies.autoTopicCreationOverride;
+ if (policies.isPresent() &&
policies.get().autoTopicCreationOverride != null) {
+ return policies.get().autoTopicCreationOverride;
}
} catch (Throwable t) {
// Ignoring since if we don't have policies, we fallback on the
default
@@ -2601,11 +2602,11 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
private AutoSubscriptionCreationOverride
getAutoSubscriptionCreationOverride(final TopicName topicName) {
try {
- Policies policies = pulsar.getConfigurationCache().policiesCache()
- .getDataIfPresent(AdminResource.path(POLICIES,
topicName.getNamespace()));
+ Optional<Policies> policies =
pulsar.getConfigurationCache().policiesCache()
+ .get(AdminResource.path(POLICIES,
topicName.getNamespace()));
// If namespace policies have the field set, it will override the
broker-level setting
- if (policies != null && policies.autoSubscriptionCreationOverride
!= null) {
- return policies.autoSubscriptionCreationOverride;
+ if (policies.isPresent() &&
policies.get().autoSubscriptionCreationOverride != null) {
+ return policies.get().autoSubscriptionCreationOverride;
}
} catch (Throwable t) {
// Ignoring since if we don't have policies, we fallback on the
default
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index 91eb074..229ae17 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -324,16 +324,18 @@ public class DispatchRateLimiter {
public static Optional<Policies> getPolicies(BrokerService brokerService,
String topicName) {
final NamespaceName namespace =
TopicName.get(topicName).getNamespaceObject();
final String path = path(POLICIES, namespace.toString());
- Policies policies = null;
+ Optional<Policies >policies = Optional.empty();
try {
ConfigurationCacheService configurationCacheService =
brokerService.pulsar().getConfigurationCache();
if (configurationCacheService != null) {
- policies =
configurationCacheService.policiesCache().getDataIfPresent(path);
+ policies =
configurationCacheService.policiesCache().getAsync(path)
+
.get(brokerService.pulsar().getConfiguration().getZooKeeperOperationTimeoutSeconds(),
+ TimeUnit.SECONDS);
}
} catch (Exception e) {
log.warn("Failed to get message-rate for {} ", topicName, e);
}
- return Optional.ofNullable(policies);
+ return policies;
}
/**
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index b8645c8..f0ee3a1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.broker.web;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.commons.lang3.StringUtils.isBlank;
-import static
org.apache.pulsar.broker.admin.AdminResource.POLICIES_READONLY_FLAG_PATH;
import static
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static
org.apache.pulsar.broker.cache.ConfigurationCacheService.RESOURCEGROUPS;
import com.fasterxml.jackson.databind.ObjectMapper;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 071160b0..29d86d6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -1216,7 +1216,8 @@ public class ReplicatorTest extends ReplicatorTestBase {
Replicator replicator =
pulsar1.getBrokerService().getTopicReference(topicName)
.get().getReplicators().get(cluster4);
- Awaitility.await().untilAsserted(() ->
Assert.assertTrue(replicator.isConnected()));
+ Awaitility.waitAtMost(30, TimeUnit.SECONDS)
+ .untilAsserted(() -> Assert.assertTrue(replicator.isConnected()));
admin1.clusters().deleteCluster(cluster4);
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index 611d8bd..5702370 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -646,24 +646,4 @@ public class PulsarCluster {
"namespaces", "set-deduplication", "public/" + nsName,
enabled ? "--enable" : "--disable");
}
-
-<<<<<<< HEAD
-=======
- public void dumpFunctionLogs(String name) {
- for (WorkerContainer container : getAlWorkers()) {
- log.info("Trying to get function {} logs from container {}", name,
container.getContainerName());
- try {
- String logFile = "/pulsar/logs/functions/public/default/" +
name + "/" + name + "-0.log";
- String logs = container.<String>copyFileFromContainer(logFile,
(inputStream) -> {
- return IOUtils.toString(inputStream, "utf-8");
- });
- log.info("Function {} logs {}", name, logs);
- } catch (com.github.dockerjava.api.exception.NotFoundException
notFound) {
- log.info("Cannot download {} logs from {} not found exception
{}", name, container.getContainerName(), notFound.toString());
- } catch (Throwable err) {
- log.info("Cannot download {} logs from {}", name,
container.getContainerName(), err);
- }
- }
- }
->>>>>>> a69611c287d (fix logger number not correct in tests (#12168))
}