rdhabalia commented on a change in pull request #1262: Broker should not start 
replicator for root partitioned-topic
URL: https://github.com/apache/incubator-pulsar/pull/1262#discussion_r169757688
 
 

 ##########
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
 ##########
 @@ -211,5 +215,42 @@ public static String getReplicatorName(String 
replicatorPrefix, String cluster)
         return (replicatorPrefix + "." + cluster).intern();
     }
 
+    /**
+     * Replication can't be started on root-partitioned-topic to avoid 
producer startup conflict.
+     * 
+     * <pre>
+     * eg:
+     * if topic : persistent://prop/cluster/ns/my-topic is a partitioned topic 
with 2 partitions then
+     * broker explicitly creates replicator producer for: 
"my-topic-partition-1" and "my-topic-partition-2".
+     * 
+     * However, if broker tries to start producer with root topic "my-topic" 
then client-lib internally creates individual 
+     * producers for "my-topic-partition-1" and "my-topic-partition-2" which 
creates conflict with existing 
+     * replicator producers.
+     * </pre>
+     * 
+     * Therefore, replicator can't be started on root-partition topic which 
can internally create multiple partitioned
+     * producers.
+     * 
+     * @param topicName
+     * @param brokerService
+     */
+    private void validatePartitionedTopic(String topicName, BrokerService 
brokerService) throws NamingException {
+        DestinationName destination = DestinationName.get(topicName);
+        String partitionedTopicPath = 
path(AdminResource.PARTITIONED_TOPIC_PATH_ZNODE,
+                destination.getNamespace().toString(), 
destination.getDomain().toString(),
+                destination.getEncodedLocalName());
+        boolean isPartitionedTopic = false;
+        try {
+            isPartitionedTopic = 
brokerService.pulsar().getConfigurationCache().policiesCache()
+                    .get(partitionedTopicPath).isPresent();
+        } catch (Exception e) {
+            log.warn("Failed to verify partitioned topic {}-{}", topicName, 
e.getMessage());
 
 Review comment:
   for any case, if we don't find the znode or get exception then we don't want 
to make it critical so, we can avoid topic loading failure.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to