poorbarcode commented on code in PR #25312:
URL: https://github.com/apache/pulsar/pull/25312#discussion_r2937851628


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java:
##########
@@ -669,6 +676,136 @@ public void testSetClustersAndAllowedClusters() throws 
Exception {
         }
     }
 
+    @DataProvider
+    public Object[][] localSystemTopicPartitions() {
+        return new Object[][] {
+                {0},
+                {3}
+        };
+    }
+
+    @Test(dataProvider = "localSystemTopicPartitions")
+    public void testSystemTopicCreationWithDifferentTopicCreationRule(int 
localSystemTopicPartitions) throws Exception {
+        String ns = BrokerTestUtil.newUniqueName(defaultTenant + "/ns");
+        Predicate<String> topicNameFilter = t -> 
TopicName.get(t).getNamespace().equals(ns);
+        String systemTopic = "persistent://" + ns + "/__change_events";
+        admin1.namespaces().createNamespace(ns);
+        admin1.namespaces().setNamespaceReplicationClusters(ns, new 
HashSet<>(Arrays.asList(cluster1)), false);
+        Awaitility.await().untilAsserted(() -> {
+            
assertEquals(admin1.namespaces().getNamespaceReplicationClusters(ns).size(), 1);
+            
assertEquals(admin2.namespaces().getNamespaceReplicationClusters(ns).size(), 1);
+        });
+
+        // Trigger system topic creation on cluster1, following {@param 
localSystemTopicPartitions}.
+        AutoTopicCreationOverride autoTopicCreation1 = null;
+        if (localSystemTopicPartitions == 0) {
+            autoTopicCreation1 = 
AutoTopicCreationOverrideImpl.builder().allowAutoTopicCreation(true)
+                    .topicType("non-partitioned").build();
+        } else {
+            autoTopicCreation1 = 
AutoTopicCreationOverrideImpl.builder().allowAutoTopicCreation(true)
+                    
.topicType("partitioned").defaultNumPartitions(localSystemTopicPartitions).build();
+        }
+        admin1.namespaces().setAutoTopicCreation(ns, autoTopicCreation1);
+        Awaitility.await().untilAsserted(() -> {
+            AutoTopicCreationOverride autoTopicCreationOverride =
+                    admin1.namespaces().getAutoTopicCreationAsync(ns).get(3, 
TimeUnit.SECONDS);
+            assertNotNull(autoTopicCreationOverride);
+            if (localSystemTopicPartitions == 0) {
+                
assertTrue("non-partitioned".equalsIgnoreCase(autoTopicCreationOverride.getTopicType()));
+            } else {
+                
assertEquals(autoTopicCreationOverride.getDefaultNumPartitions(), 
localSystemTopicPartitions);
+            }
+        });
+        // Use a topic loading to trigger system topic creation.
+        String topicUsedToTriggerSystemTopic = 
BrokerTestUtil.newUniqueName("persistent://" + ns + "/tp");
+        
admin1.topics().createNonPartitionedTopic(topicUsedToTriggerSystemTopic);
+        admin1.topics().delete(topicUsedToTriggerSystemTopic, false);
+        // Verify: the system topic was created as expected.
+        Awaitility.await().untilAsserted(() -> {
+            TopicExistsInfo existsInfo = pulsar1.getNamespaceService()
+                    .checkTopicExistsAsync(TopicName.get(systemTopic)).get(3, 
TimeUnit.SECONDS);
+            assertTrue(existsInfo.isExists());
+            if (localSystemTopicPartitions == 0) {
+                assertEquals(existsInfo.getTopicType(), 
TopicType.NON_PARTITIONED);
+            } else {
+                assertEquals(existsInfo.getTopicType(), TopicType.PARTITIONED);
+                assertEquals(existsInfo.getPartitions(), 
localSystemTopicPartitions);
+            }
+        });
+
+        // Enable replication.
+        // Set topic auto-creation rule to "partitions: 2".
+        final String tp = BrokerTestUtil.newUniqueName("persistent://" + ns + 
"/tp");
+        final Set<String> clusters = new HashSet<>(Arrays.asList(cluster1, 
cluster2));
+        admin1.namespaces().setNamespaceReplicationClusters(ns, clusters, 
true);
+        AutoTopicCreationOverride autoTopicCreation2 =
+                
AutoTopicCreationOverrideImpl.builder().allowAutoTopicCreation(true)
+                        
.topicType("partitioned").defaultNumPartitions(2).build();
+        admin1.namespaces().setAutoTopicCreation(ns, autoTopicCreation2);
+        admin2.namespaces().setAutoTopicCreation(ns, autoTopicCreation2);
+        Awaitility.await().untilAsserted(() -> {
+            
assertEquals(admin1.namespaces().getAutoTopicCreationAsync(ns).join()
+                    .getDefaultNumPartitions(), 2);
+            
assertEquals(admin2.namespaces().getAutoTopicCreationAsync(ns).join()
+                    .getDefaultNumPartitions(), 2);
+        });
+
+        admin2.topics().createNonPartitionedTopic(tp);
+        Producer<String> p2 = 
client2.newProducer(Schema.STRING).topic(tp).create();
+        p2.send("msg-1");
+        p2.close();
+        Producer<String> p1 = 
client1.newProducer(Schema.STRING).topic(tp).create();
+        p1.send("msg-1");
+        p1.close();
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopic persistentTopic1 = (PersistentTopic) 
broker1.getTopic(tp, false).join().get();
+            assertFalse(persistentTopic1.getReplicators().isEmpty());
+            PersistentTopic persistentTopic2 = (PersistentTopic) 
broker2.getTopic(tp, false).join().get();
+            assertFalse(persistentTopic2.getReplicators().isEmpty());
+        });
+
+        // Verify: the topics are the same between two clusters.
+        Awaitility.await().untilAsserted(() -> {
+            List<String> topics1 = 
pulsar1.getBrokerService().getTopics().keySet()
+                    
.stream().filter(topicNameFilter).collect(Collectors.toList());
+            List<String> topics2 = 
pulsar2.getBrokerService().getTopics().keySet()
+                    
.stream().filter(topicNameFilter).collect(Collectors.toList());
+            Collections.sort(topics1);
+            Collections.sort(topics2);
+            boolean systemTopicCreated1 = false;
+            for (String tp1 : topics1) {
+                if (tp1.contains("__change_events")) {
+                    systemTopicCreated1 = true;
+                    break;
+                }
+            }
+            boolean systemTopicCreated2 = false;
+            for (String tp2 : topics2) {
+                if (tp2.contains("__change_events")) {
+                    systemTopicCreated2 = true;
+                    break;
+                }
+            }
+            log.info("topics1: {}", topics1);
+            log.info("topics2: {}", topics2);
+            assertTrue(systemTopicCreated1);
+            assertTrue(systemTopicCreated2);
+            assertEquals(topics1, topics2);
+        });
+
+        // cleanup.
+        admin1.topics().setReplicationClusters(tp, Arrays.asList(cluster1));
+        admin2.topics().setReplicationClusters(tp, Arrays.asList(cluster2));
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopic persistentTopic1 = (PersistentTopic) 
broker1.getTopic(tp, false).join().get();
+            assertTrue(persistentTopic1.getReplicators().isEmpty());
+            PersistentTopic persistentTopic2 = (PersistentTopic) 
broker1.getTopic(tp, false).join().get();
+            assertTrue(persistentTopic2.getReplicators().isEmpty());

Review Comment:
   Modified



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to