This is an automated email from the ASF dual-hosted git repository. yushiga pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new c308bd5 Fixed merge issues with DestinationName renaming (#1282) c308bd5 is described below commit c308bd5f81a1eb2c63d4d2dc3ddb652690a74bec Author: Matteo Merli <mme...@apache.org> AuthorDate: Mon Feb 26 01:33:55 2018 -0800 Fixed merge issues with DestinationName renaming (#1282) --- .../NonPersistentDispatcherSingleActiveConsumer.java | 6 +++--- .../broker/service/nonpersistent/NonPersistentTopic.java | 11 ++++------- .../persistent/PersistentDispatcherMultipleConsumers.java | 8 ++++---- .../PersistentDispatcherSingleActiveConsumer.java | 15 +++++++-------- .../pulsar/broker/service/persistent/PersistentTopic.java | 4 ++-- .../apache/pulsar/broker/service/PersistentTopicTest.java | 13 +++++++------ 6 files changed, 27 insertions(+), 30 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java index 092dc36..2083cc7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java @@ -31,7 +31,7 @@ import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer; import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; -import org.apache.pulsar.common.naming.DestinationName; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.Policies; public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractDispatcherSingleActiveConsumer implements NonPersistentDispatcher { @@ -70,7 +70,7 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD Policies policies; try { policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache() - .get(AdminResource.path(POLICIES, DestinationName.get(topicName).getNamespace())) + .get(AdminResource.path(POLICIES, TopicName.get(topicName).getNamespace())) .orElseGet(() -> new Policies()); } catch (Exception e) { policies = new Policies(); @@ -88,7 +88,7 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD Policies policies; try { policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache() - .get(AdminResource.path(POLICIES, DestinationName.get(topicName).getNamespace())) + .get(AdminResource.path(POLICIES, TopicName.get(topicName).getNamespace())) .orElseGet(() -> new Policies()); } catch (Exception e) { policies = new Policies(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index d8510df..2ec8042 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -34,7 +34,6 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.bookkeeper.mledger.Entry; -import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.util.SafeRun; import org.apache.bookkeeper.util.OrderedSafeExecutor; @@ -50,8 +49,6 @@ import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyE import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicFencedException; import org.apache.pulsar.broker.service.BrokerServiceException.UnsupportedVersionException; -import org.apache.pulsar.broker.service.persistent.PersistentReplicator; -import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.Producer; import org.apache.pulsar.broker.service.Replicator; @@ -268,7 +265,7 @@ public class NonPersistentTopic implements Topic { Policies policies; try { policies = brokerService.pulsar().getConfigurationCache().policiesCache() - .get(AdminResource.path(POLICIES, DestinationName.get(topic).getNamespace())) + .get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace())) .orElseGet(() -> new Policies()); } catch (Exception e) { policies = new Policies(); @@ -591,7 +588,7 @@ public class NonPersistentTopic implements Topic { } return isReplicatorStarted.get(); } - + CompletableFuture<Void> removeReplicator(String remoteCluster) { log.info("[{}] Removing replicator to {}", topic, remoteCluster); final CompletableFuture<Void> future = new CompletableFuture<>(); @@ -976,7 +973,7 @@ public class NonPersistentTopic implements Topic { this.hasBatchMessagePublished = true; } - - + + private static final Logger log = LoggerFactory.getLogger(NonPersistentTopic.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 2e59811..54e40b6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -40,13 +40,13 @@ import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers; import org.apache.pulsar.broker.service.BrokerServiceException; +import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException; import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.Consumer.SendMessageInfo; import org.apache.pulsar.broker.service.Dispatcher; -import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException; import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; -import org.apache.pulsar.common.naming.DestinationName; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.collections.ConcurrentLongPairSet; @@ -139,7 +139,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMu Policies policies; try { policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache() - .get(AdminResource.path(POLICIES, DestinationName.get(topic.getName()).getNamespace())) + .get(AdminResource.path(POLICIES, TopicName.get(topic.getName()).getNamespace())) .orElseGet(() -> new Policies()); } catch (Exception e) { policies = new Policies(); @@ -157,7 +157,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMu Policies policies; try { policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache() - .get(AdminResource.path(POLICIES, DestinationName.get(topic.getName()).getNamespace())) + .get(AdminResource.path(POLICIES, TopicName.get(topic.getName()).getNamespace())) .orElseGet(() -> new Policies()); } catch (Exception e) { policies = new Policies(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index bd197e8..cc30836 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -23,9 +23,8 @@ import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS; import java.util.List; -import java.util.Optional; -import java.util.concurrent.TimeUnit; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.Entry; @@ -42,7 +41,7 @@ import org.apache.pulsar.broker.service.Consumer.SendMessageInfo; import org.apache.pulsar.broker.service.Dispatcher; import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; -import org.apache.pulsar.common.naming.DestinationName; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.util.Codec; import org.slf4j.Logger; @@ -53,7 +52,7 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp private final PersistentTopic topic; private final ManagedCursor cursor; private final String name; - + private boolean havePendingRead = false; private static final int MaxReadBatchSize = 100; @@ -122,7 +121,7 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp Policies policies; try { policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache() - .get(AdminResource.path(POLICIES, DestinationName.get(topicName).getNamespace())) + .get(AdminResource.path(POLICIES, TopicName.get(topicName).getNamespace())) .orElseGet(() -> new Policies()); } catch (Exception e) { policies = new Policies(); @@ -140,7 +139,7 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp Policies policies; try { policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache() - .get(AdminResource.path(POLICIES, DestinationName.get(topicName).getNamespace())) + .get(AdminResource.path(POLICIES, TopicName.get(topicName).getNamespace())) .orElseGet(() -> new Policies()); } catch (Exception e) { policies = new Policies(); @@ -201,7 +200,7 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp if (future.isSuccess()) { // acquire message-dispatch permits for already delivered messages if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) { - topic.getDispatchRateLimiter().tryDispatchPermit(totalMessagesSent, totalBytesSent); + topic.getDispatchRateLimiter().tryDispatchPermit(totalMessagesSent, totalBytesSent); } // Schedule a new read batch operation only after the previous batch has been written to the socket synchronized (PersistentDispatcherSingleActiveConsumer.this) { @@ -303,7 +302,7 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp } int messagesToRead = Math.min(availablePermits, readBatchSize); - + // throttle only if: (1) cursor is not active (or flag for throttle-nonBacklogConsumer is enabled) bcz // active-cursor reads message from cache rather from bookkeeper (2) if topic has reached message-rate // threshold: then schedule the read after MESSAGE_RATE_BACKOFF_MS diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 4d5d974..62017f0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -351,7 +351,7 @@ public class PersistentTopic implements Topic, AddEntryCallback { Policies policies; try { policies = brokerService.pulsar().getConfigurationCache().policiesCache() - .get(AdminResource.path(POLICIES, DestinationName.get(topic).getNamespace())) + .get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace())) .orElseGet(() -> new Policies()); } catch (Exception e) { policies = new Policies(); @@ -911,7 +911,7 @@ public class PersistentTopic implements Topic, AddEntryCallback { String localCluster = brokerService.pulsar().getConfiguration().getClusterName(); boolean isReplicatorStarted = addReplicationCluster(remoteCluster, PersistentTopic.this, cursor, localCluster); if (isReplicatorStarted) { - future.complete(null); + future.complete(null); } else { future.completeExceptionally(new NamingException( PersistentTopic.this.getName() + " Failed to start replicator " + remoteCluster)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index d0277ef..c1b44c0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -39,7 +39,6 @@ import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; -import com.google.common.collect.ImmutableMap; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.net.InetSocketAddress; @@ -89,12 +88,12 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; -import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe; import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; -import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.naming.NamespaceBundle; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.compaction.CompactedTopic; @@ -111,6 +110,8 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import com.google.common.collect.ImmutableMap; + import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -405,7 +406,7 @@ public class PersistentTopicTest { Policies policies = new Policies(); policies.max_producers_per_topic = 2; when(pulsar.getConfigurationCache().policiesCache() - .get(AdminResource.path(POLICIES, DestinationName.get(successTopicName).getNamespace()))) + .get(AdminResource.path(POLICIES, TopicName.get(successTopicName).getNamespace()))) .thenReturn(Optional.of(policies)); testMaxProducers(); } @@ -576,7 +577,7 @@ public class PersistentTopicTest { policies.max_consumers_per_subscription = 2; policies.max_consumers_per_topic = 3; when(pulsar.getConfigurationCache().policiesCache() - .get(AdminResource.path(POLICIES, DestinationName.get(successTopicName).getNamespace()))) + .get(AdminResource.path(POLICIES, TopicName.get(successTopicName).getNamespace()))) .thenReturn(Optional.of(policies)); testMaxConsumersShared(); @@ -667,7 +668,7 @@ public class PersistentTopicTest { policies.max_consumers_per_subscription = 2; policies.max_consumers_per_topic = 3; when(pulsar.getConfigurationCache().policiesCache() - .get(AdminResource.path(POLICIES, DestinationName.get(successTopicName).getNamespace()))) + .get(AdminResource.path(POLICIES, TopicName.get(successTopicName).getNamespace()))) .thenReturn(Optional.of(policies)); testMaxConsumersFailover(); -- To stop receiving notification emails like this one, please contact yush...@apache.org.