This is an automated email from the ASF dual-hosted git repository.

yushiga pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c308bd5  Fixed merge issues with DestinationName renaming (#1282)
c308bd5 is described below

commit c308bd5f81a1eb2c63d4d2dc3ddb652690a74bec
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Mon Feb 26 01:33:55 2018 -0800

    Fixed merge issues with DestinationName renaming (#1282)
---
 .../NonPersistentDispatcherSingleActiveConsumer.java      |  6 +++---
 .../broker/service/nonpersistent/NonPersistentTopic.java  | 11 ++++-------
 .../persistent/PersistentDispatcherMultipleConsumers.java |  8 ++++----
 .../PersistentDispatcherSingleActiveConsumer.java         | 15 +++++++--------
 .../pulsar/broker/service/persistent/PersistentTopic.java |  4 ++--
 .../apache/pulsar/broker/service/PersistentTopicTest.java | 13 +++++++------
 6 files changed, 27 insertions(+), 30 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
index 092dc36..2083cc7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
@@ -31,7 +31,7 @@ import 
org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
-import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.Policies;
 
 public final class NonPersistentDispatcherSingleActiveConsumer extends 
AbstractDispatcherSingleActiveConsumer implements NonPersistentDispatcher {
@@ -70,7 +70,7 @@ public final class 
NonPersistentDispatcherSingleActiveConsumer extends AbstractD
         Policies policies;
         try {
             policies = 
topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
-                    .get(AdminResource.path(POLICIES, 
DestinationName.get(topicName).getNamespace()))
+                    .get(AdminResource.path(POLICIES, 
TopicName.get(topicName).getNamespace()))
                     .orElseGet(() -> new Policies());
         } catch (Exception e) {
             policies = new Policies();
@@ -88,7 +88,7 @@ public final class 
NonPersistentDispatcherSingleActiveConsumer extends AbstractD
         Policies policies;
         try {
             policies = 
topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
-                    .get(AdminResource.path(POLICIES, 
DestinationName.get(topicName).getNamespace()))
+                    .get(AdminResource.path(POLICIES, 
TopicName.get(topicName).getNamespace()))
                     .orElseGet(() -> new Policies());
         } catch (Exception e) {
             policies = new Policies();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index d8510df..2ec8042 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -34,7 +34,6 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.bookkeeper.mledger.Entry;
-import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.util.SafeRun;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
@@ -50,8 +49,6 @@ import 
org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyE
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicFencedException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.UnsupportedVersionException;
-import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
-import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Producer;
 import org.apache.pulsar.broker.service.Replicator;
@@ -268,7 +265,7 @@ public class NonPersistentTopic implements Topic {
         Policies policies;
         try {
             policies =  
brokerService.pulsar().getConfigurationCache().policiesCache()
-                    .get(AdminResource.path(POLICIES, 
DestinationName.get(topic).getNamespace()))
+                    .get(AdminResource.path(POLICIES, 
TopicName.get(topic).getNamespace()))
                     .orElseGet(() -> new Policies());
         } catch (Exception e) {
             policies = new Policies();
@@ -591,7 +588,7 @@ public class NonPersistentTopic implements Topic {
         }
         return isReplicatorStarted.get();
     }
-    
+
     CompletableFuture<Void> removeReplicator(String remoteCluster) {
         log.info("[{}] Removing replicator to {}", topic, remoteCluster);
         final CompletableFuture<Void> future = new CompletableFuture<>();
@@ -976,7 +973,7 @@ public class NonPersistentTopic implements Topic {
         this.hasBatchMessagePublished = true;
     }
 
-    
-    
+
+
     private static final Logger log = 
LoggerFactory.getLogger(NonPersistentTopic.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 2e59811..54e40b6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -40,13 +40,13 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.BrokerServiceException;
+import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Consumer.SendMessageInfo;
 import org.apache.pulsar.broker.service.Dispatcher;
-import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
-import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.collections.ConcurrentLongPairSet;
@@ -139,7 +139,7 @@ public class PersistentDispatcherMultipleConsumers  extends 
AbstractDispatcherMu
         Policies policies;
         try {
             policies = 
topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
-                    .get(AdminResource.path(POLICIES, 
DestinationName.get(topic.getName()).getNamespace()))
+                    .get(AdminResource.path(POLICIES, 
TopicName.get(topic.getName()).getNamespace()))
                     .orElseGet(() -> new Policies());
         } catch (Exception e) {
             policies = new Policies();
@@ -157,7 +157,7 @@ public class PersistentDispatcherMultipleConsumers  extends 
AbstractDispatcherMu
         Policies policies;
         try {
             policies = 
topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
-                    .get(AdminResource.path(POLICIES, 
DestinationName.get(topic.getName()).getNamespace()))
+                    .get(AdminResource.path(POLICIES, 
TopicName.get(topic.getName()).getNamespace()))
                     .orElseGet(() -> new Policies());
         } catch (Exception e) {
             policies = new Policies();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index bd197e8..cc30836 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -23,9 +23,8 @@ import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import static 
org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
 
 import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.Entry;
@@ -42,7 +41,7 @@ import 
org.apache.pulsar.broker.service.Consumer.SendMessageInfo;
 import org.apache.pulsar.broker.service.Dispatcher;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
-import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.util.Codec;
 import org.slf4j.Logger;
@@ -53,7 +52,7 @@ public final class PersistentDispatcherSingleActiveConsumer 
extends AbstractDisp
     private final PersistentTopic topic;
     private final ManagedCursor cursor;
     private final String name;
-    
+
     private boolean havePendingRead = false;
 
     private static final int MaxReadBatchSize = 100;
@@ -122,7 +121,7 @@ public final class PersistentDispatcherSingleActiveConsumer 
extends AbstractDisp
         Policies policies;
         try {
             policies = 
topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
-                    .get(AdminResource.path(POLICIES, 
DestinationName.get(topicName).getNamespace()))
+                    .get(AdminResource.path(POLICIES, 
TopicName.get(topicName).getNamespace()))
                     .orElseGet(() -> new Policies());
         } catch (Exception e) {
             policies = new Policies();
@@ -140,7 +139,7 @@ public final class PersistentDispatcherSingleActiveConsumer 
extends AbstractDisp
         Policies policies;
         try {
             policies = 
topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
-                    .get(AdminResource.path(POLICIES, 
DestinationName.get(topicName).getNamespace()))
+                    .get(AdminResource.path(POLICIES, 
TopicName.get(topicName).getNamespace()))
                     .orElseGet(() -> new Policies());
         } catch (Exception e) {
             policies = new Policies();
@@ -201,7 +200,7 @@ public final class PersistentDispatcherSingleActiveConsumer 
extends AbstractDisp
                 if (future.isSuccess()) {
                     // acquire message-dispatch permits for already delivered 
messages
                     if 
(serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || 
!cursor.isActive()) {
-                        
topic.getDispatchRateLimiter().tryDispatchPermit(totalMessagesSent, 
totalBytesSent);    
+                        
topic.getDispatchRateLimiter().tryDispatchPermit(totalMessagesSent, 
totalBytesSent);
                     }
                     // Schedule a new read batch operation only after the 
previous batch has been written to the socket
                     synchronized 
(PersistentDispatcherSingleActiveConsumer.this) {
@@ -303,7 +302,7 @@ public final class PersistentDispatcherSingleActiveConsumer 
extends AbstractDisp
             }
 
             int messagesToRead = Math.min(availablePermits, readBatchSize);
-            
+
             // throttle only if: (1) cursor is not active (or flag for 
throttle-nonBacklogConsumer is enabled) bcz
             // active-cursor reads message from cache rather from bookkeeper 
(2) if topic has reached message-rate
             // threshold: then schedule the read after MESSAGE_RATE_BACKOFF_MS
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 4d5d974..62017f0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -351,7 +351,7 @@ public class PersistentTopic implements Topic, 
AddEntryCallback {
         Policies policies;
         try {
             policies =  
brokerService.pulsar().getConfigurationCache().policiesCache()
-                    .get(AdminResource.path(POLICIES, 
DestinationName.get(topic).getNamespace()))
+                    .get(AdminResource.path(POLICIES, 
TopicName.get(topic).getNamespace()))
                     .orElseGet(() -> new Policies());
         } catch (Exception e) {
             policies = new Policies();
@@ -911,7 +911,7 @@ public class PersistentTopic implements Topic, 
AddEntryCallback {
                 String localCluster = 
brokerService.pulsar().getConfiguration().getClusterName();
                 boolean isReplicatorStarted = 
addReplicationCluster(remoteCluster, PersistentTopic.this, cursor, 
localCluster);
                 if (isReplicatorStarted) {
-                    future.complete(null);    
+                    future.complete(null);
                 } else {
                     future.completeExceptionally(new NamingException(
                             PersistentTopic.this.getName() + " Failed to start 
replicator " + remoteCluster));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index d0277ef..c1b44c0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -39,7 +39,6 @@ import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
-import com.google.common.collect.ImmutableMap;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
@@ -89,12 +88,12 @@ import 
org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
-import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
-import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.naming.NamespaceBundle;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.compaction.CompactedTopic;
@@ -111,6 +110,8 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import com.google.common.collect.ImmutableMap;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 
@@ -405,7 +406,7 @@ public class PersistentTopicTest {
         Policies policies = new Policies();
         policies.max_producers_per_topic = 2;
         when(pulsar.getConfigurationCache().policiesCache()
-                .get(AdminResource.path(POLICIES, 
DestinationName.get(successTopicName).getNamespace())))
+                .get(AdminResource.path(POLICIES, 
TopicName.get(successTopicName).getNamespace())))
                 .thenReturn(Optional.of(policies));
         testMaxProducers();
     }
@@ -576,7 +577,7 @@ public class PersistentTopicTest {
         policies.max_consumers_per_subscription = 2;
         policies.max_consumers_per_topic = 3;
         when(pulsar.getConfigurationCache().policiesCache()
-                .get(AdminResource.path(POLICIES, 
DestinationName.get(successTopicName).getNamespace())))
+                .get(AdminResource.path(POLICIES, 
TopicName.get(successTopicName).getNamespace())))
                 .thenReturn(Optional.of(policies));
 
         testMaxConsumersShared();
@@ -667,7 +668,7 @@ public class PersistentTopicTest {
         policies.max_consumers_per_subscription = 2;
         policies.max_consumers_per_topic = 3;
         when(pulsar.getConfigurationCache().policiesCache()
-                .get(AdminResource.path(POLICIES, 
DestinationName.get(successTopicName).getNamespace())))
+                .get(AdminResource.path(POLICIES, 
TopicName.get(successTopicName).getNamespace())))
                 .thenReturn(Optional.of(policies));
 
         testMaxConsumersFailover();

-- 
To stop receiving notification emails like this one, please contact
yush...@apache.org.

Reply via email to