This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.8 by this push:
     new 1dada32  [Branch-2.8]fix cherry-pick issue (#12397)
1dada32 is described below

commit 1dada32e2cf13656f433ce867e1b00b9c156957b
Author: Hang Chen <[email protected]>
AuthorDate: Thu Oct 21 16:48:00 2021 +0800

    [Branch-2.8]fix cherry-pick issue (#12397)
    
    ### Motivation
    Fix branch 2.8 cherry-pick issue.
---
 .../apache/pulsar/broker/service/AbstractTopic.java  | 14 +++++++-------
 .../apache/pulsar/broker/service/BrokerService.java  | 17 +++++++++--------
 .../service/persistent/DispatchRateLimiter.java      |  8 +++++---
 .../apache/pulsar/broker/web/PulsarWebResource.java  |  1 -
 .../apache/pulsar/broker/service/ReplicatorTest.java |  3 ++-
 .../tests/integration/topologies/PulsarCluster.java  | 20 --------------------
 6 files changed, 23 insertions(+), 40 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 62f8fe1..ff08aba 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -146,11 +146,11 @@ public abstract class AbstractTopic implements Topic {
             Policies policies;
             try {
                 policies = 
brokerService.pulsar().getConfigurationCache().policiesCache()
-                        .getDataIfPresent(AdminResource.path(POLICIES, 
TopicName.get(topic).getNamespace()));
+                        .get(AdminResource.path(POLICIES, 
TopicName.get(topic).getNamespace()))
+                    .orElseGet(Policies::new);
             } catch (Exception e) {
                 policies = new Policies();
             }
-
             maxProducers = policies.max_producers_per_topic;
         }
         maxProducers = maxProducers != null ? maxProducers : 
brokerService.pulsar()
@@ -193,10 +193,12 @@ public abstract class AbstractTopic implements Topic {
                 // Use getDataIfPresent from zk cache to make the call 
non-blocking and prevent deadlocks
                 policies = 
brokerService.pulsar().getConfigurationCache().policiesCache()
                         .getDataIfPresent(AdminResource.path(POLICIES, 
TopicName.get(topic).getNamespace()));
+                if (policies == null) {
+                    policies = new Policies();
+                }
             } catch (Exception e) {
                 policies = new Policies();
             }
-
             maxConsumers = policies.max_consumers_per_topic;
         }
         final int maxConsumersPerTopic = maxConsumers != null ? maxConsumers
@@ -773,10 +775,8 @@ public abstract class AbstractTopic implements Topic {
                 policies = optPolicies.get();
             } else {
                 policies = 
brokerService.pulsar().getConfigurationCache().policiesCache()
-                    .getDataIfPresent(AdminResource.path(POLICIES, 
TopicName.get(topic).getNamespace()));
-                if (policies == null) {
-                    policies = new Policies();
-                }
+                    .get(AdminResource.path(POLICIES, 
TopicName.get(topic).getNamespace()))
+                    .orElseGet(Policies::new);
             }
         } catch (Exception e) {
             log.warn("[{}] Error getting policies {} and publish throttling 
will be disabled", topic, e.getMessage());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 98b2f8f..a0239aa 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2568,11 +2568,12 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
 
     private AutoTopicCreationOverride getAutoTopicCreationOverride(final 
TopicName topicName) {
         try {
-            Policies policies = pulsar.getConfigurationCache().policiesCache()
-                            .getDataIfPresent(AdminResource.path(POLICIES, 
topicName.getNamespace()));
+            Optional<Policies> policies = 
pulsar.getConfigurationCache().policiesCache()
+                .get(AdminResource.path(POLICIES, topicName.getNamespace()));
+
             // If namespace policies have the field set, it will override the 
broker-level setting
-            if (policies != null && policies.autoTopicCreationOverride != 
null) {
-                return policies.autoTopicCreationOverride;
+            if (policies.isPresent() && 
policies.get().autoTopicCreationOverride != null) {
+                return policies.get().autoTopicCreationOverride;
             }
         } catch (Throwable t) {
             // Ignoring since if we don't have policies, we fallback on the 
default
@@ -2601,11 +2602,11 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
 
     private AutoSubscriptionCreationOverride 
getAutoSubscriptionCreationOverride(final TopicName topicName) {
         try {
-            Policies policies = pulsar.getConfigurationCache().policiesCache()
-                    .getDataIfPresent(AdminResource.path(POLICIES, 
topicName.getNamespace()));
+            Optional<Policies> policies = 
pulsar.getConfigurationCache().policiesCache()
+                    .get(AdminResource.path(POLICIES, 
topicName.getNamespace()));
             // If namespace policies have the field set, it will override the 
broker-level setting
-            if (policies != null && policies.autoSubscriptionCreationOverride 
!= null) {
-                return policies.autoSubscriptionCreationOverride;
+            if (policies.isPresent() && 
policies.get().autoSubscriptionCreationOverride != null) {
+                return policies.get().autoSubscriptionCreationOverride;
             }
         } catch (Throwable t) {
             // Ignoring since if we don't have policies, we fallback on the 
default
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index 91eb074..229ae17 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -324,16 +324,18 @@ public class DispatchRateLimiter {
     public static Optional<Policies> getPolicies(BrokerService brokerService, 
String topicName) {
         final NamespaceName namespace = 
TopicName.get(topicName).getNamespaceObject();
         final String path = path(POLICIES, namespace.toString());
-        Policies policies = null;
+        Optional<Policies >policies = Optional.empty();
         try {
             ConfigurationCacheService configurationCacheService = 
brokerService.pulsar().getConfigurationCache();
             if (configurationCacheService != null) {
-                policies = 
configurationCacheService.policiesCache().getDataIfPresent(path);
+                policies = 
configurationCacheService.policiesCache().getAsync(path)
+                    
.get(brokerService.pulsar().getConfiguration().getZooKeeperOperationTimeoutSeconds(),
+                        TimeUnit.SECONDS);
             }
         } catch (Exception e) {
             log.warn("Failed to get message-rate for {} ", topicName, e);
         }
-        return Optional.ofNullable(policies);
+        return policies;
     }
 
     /**
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index b8645c8..f0ee3a1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.broker.web;
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.commons.lang3.StringUtils.isBlank;
-import static 
org.apache.pulsar.broker.admin.AdminResource.POLICIES_READONLY_FLAG_PATH;
 import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.RESOURCEGROUPS;
 import com.fasterxml.jackson.databind.ObjectMapper;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 071160b0..29d86d6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -1216,7 +1216,8 @@ public class ReplicatorTest extends ReplicatorTestBase {
         Replicator replicator = 
pulsar1.getBrokerService().getTopicReference(topicName)
                 .get().getReplicators().get(cluster4);
 
-        Awaitility.await().untilAsserted(() -> 
Assert.assertTrue(replicator.isConnected()));
+        Awaitility.waitAtMost(30, TimeUnit.SECONDS)
+            .untilAsserted(() -> Assert.assertTrue(replicator.isConnected()));
 
         admin1.clusters().deleteCluster(cluster4);
 
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index 611d8bd..5702370 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -646,24 +646,4 @@ public class PulsarCluster {
             "namespaces", "set-deduplication", "public/" + nsName,
             enabled ? "--enable" : "--disable");
     }
-
-<<<<<<< HEAD
-=======
-    public void dumpFunctionLogs(String name) {
-        for (WorkerContainer container : getAlWorkers()) {
-            log.info("Trying to get function {} logs from container {}", name, 
container.getContainerName());
-            try {
-                String logFile = "/pulsar/logs/functions/public/default/" + 
name + "/" + name + "-0.log";
-                String logs = container.<String>copyFileFromContainer(logFile, 
(inputStream) -> {
-                    return IOUtils.toString(inputStream, "utf-8");
-                });
-                log.info("Function {} logs {}", name, logs);
-            } catch (com.github.dockerjava.api.exception.NotFoundException 
notFound) {
-                log.info("Cannot download {} logs from {} not found exception 
{}", name, container.getContainerName(), notFound.toString());
-            } catch (Throwable err) {
-                log.info("Cannot download {} logs from {}", name, 
container.getContainerName(), err);
-            }
-        }
-    }
->>>>>>> a69611c287d (fix logger number not correct in tests (#12168))
 }

Reply via email to