poorbarcode commented on code in PR #22537:
URL: https://github.com/apache/pulsar/pull/22537#discussion_r1573842742


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java:
##########
@@ -153,4 +172,90 @@ public void 
testTopicCloseWhenInternalProducerCloseErrorOnce() throws Exception
             admin2.topics().delete(topicName);
         });
     }
+
+    @Test
+    public void testPartitionedTopicLevelReplication() throws Exception {
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ nonReplicatedNamespace + "/tp_");
+        final String partition0 = 
TopicName.get(topicName).getPartition(0).toString();
+        final String partition1 = 
TopicName.get(topicName).getPartition(1).toString();
+        admin1.topics().createPartitionedTopic(topicName, 2);
+        admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1, cluster2));
+        // Check the partitioned topic has been created at the remote cluster.
+        PartitionedTopicMetadata topicMetadata2 = 
admin2.topics().getPartitionedTopicMetadata(topicName);
+        assertEquals(topicMetadata2.partitions, 2);
+        // cleanup.
+        admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1));
+        waitReplicatorStopped(partition0);
+        waitReplicatorStopped(partition1);
+        admin1.topics().deletePartitionedTopic(topicName);
+        admin2.topics().deletePartitionedTopic(topicName);
+    }
+
+    @Test
+    public void testPartitionedTopicLevelReplicationRemoteTopicExist() throws 
Exception {
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ nonReplicatedNamespace + "/tp_");
+        final String partition0 = 
TopicName.get(topicName).getPartition(0).toString();
+        final String partition1 = 
TopicName.get(topicName).getPartition(1).toString();
+        admin1.topics().createPartitionedTopic(topicName, 2);
+        admin2.topics().createPartitionedTopic(topicName, 2);
+        admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1, cluster2));
+        // Check the partitioned topic has been created at the remote cluster.
+        PartitionedTopicMetadata topicMetadata2 = 
admin2.topics().getPartitionedTopicMetadata(topicName);
+        assertEquals(topicMetadata2.partitions, 2);
+        // cleanup.
+        admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1));
+        waitReplicatorStopped(partition0);
+        waitReplicatorStopped(partition1);
+        admin1.topics().deletePartitionedTopic(topicName);
+        admin2.topics().deletePartitionedTopic(topicName);
+    }
+
+    @Test
+    public void testPartitionedTopicLevelReplicationRemoteConflictTopicExist() 
throws Exception {
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ nonReplicatedNamespace + "/tp_");
+        admin2.topics().createPartitionedTopic(topicName, 3);
+        admin1.topics().createPartitionedTopic(topicName, 2);
+        try {
+            admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1, cluster2));
+            fail("Expected error due to a conflict partitioned topic already 
exists.");
+        } catch (Exception ex) {
+            Throwable unWrapEx = FutureUtil.unwrapCompletionException(ex);
+            assertTrue(unWrapEx.getMessage().contains("with different 
partitions"));
+        }
+        // Check nothing changed.
+        PartitionedTopicMetadata topicMetadata2 = 
admin2.topics().getPartitionedTopicMetadata(topicName);
+        assertEquals(topicMetadata2.partitions, 3);
+        assertEquals(admin1.topics().getReplicationClusters(topicName, 
true).size(), 1);
+        // cleanup.
+        admin1.topics().deletePartitionedTopic(topicName);
+        admin2.topics().deletePartitionedTopic(topicName);
+    }
+
+    /**
+     * TODO next PR will correct the behavior below, just left this test here.
+     */
+    // @Test
+    private void testNamespaceLevelReplicationRemoteConflictTopicExist() 
throws Exception {

Review Comment:
   Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to