poorbarcode commented on code in PR #22537: URL: https://github.com/apache/pulsar/pull/22537#discussion_r1573842742
########## pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java: ########## @@ -153,4 +172,90 @@ public void testTopicCloseWhenInternalProducerCloseErrorOnce() throws Exception admin2.topics().delete(topicName); }); } + + @Test + public void testPartitionedTopicLevelReplication() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_"); + final String partition0 = TopicName.get(topicName).getPartition(0).toString(); + final String partition1 = TopicName.get(topicName).getPartition(1).toString(); + admin1.topics().createPartitionedTopic(topicName, 2); + admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2)); + // Check the partitioned topic has been created at the remote cluster. + PartitionedTopicMetadata topicMetadata2 = admin2.topics().getPartitionedTopicMetadata(topicName); + assertEquals(topicMetadata2.partitions, 2); + // cleanup. + admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1)); + waitReplicatorStopped(partition0); + waitReplicatorStopped(partition1); + admin1.topics().deletePartitionedTopic(topicName); + admin2.topics().deletePartitionedTopic(topicName); + } + + @Test + public void testPartitionedTopicLevelReplicationRemoteTopicExist() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_"); + final String partition0 = TopicName.get(topicName).getPartition(0).toString(); + final String partition1 = TopicName.get(topicName).getPartition(1).toString(); + admin1.topics().createPartitionedTopic(topicName, 2); + admin2.topics().createPartitionedTopic(topicName, 2); + admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2)); + // Check the partitioned topic has been created at the remote cluster. + PartitionedTopicMetadata topicMetadata2 = admin2.topics().getPartitionedTopicMetadata(topicName); + assertEquals(topicMetadata2.partitions, 2); + // cleanup. + admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1)); + waitReplicatorStopped(partition0); + waitReplicatorStopped(partition1); + admin1.topics().deletePartitionedTopic(topicName); + admin2.topics().deletePartitionedTopic(topicName); + } + + @Test + public void testPartitionedTopicLevelReplicationRemoteConflictTopicExist() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_"); + admin2.topics().createPartitionedTopic(topicName, 3); + admin1.topics().createPartitionedTopic(topicName, 2); + try { + admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2)); + fail("Expected error due to a conflict partitioned topic already exists."); + } catch (Exception ex) { + Throwable unWrapEx = FutureUtil.unwrapCompletionException(ex); + assertTrue(unWrapEx.getMessage().contains("with different partitions")); + } + // Check nothing changed. + PartitionedTopicMetadata topicMetadata2 = admin2.topics().getPartitionedTopicMetadata(topicName); + assertEquals(topicMetadata2.partitions, 3); + assertEquals(admin1.topics().getReplicationClusters(topicName, true).size(), 1); + // cleanup. + admin1.topics().deletePartitionedTopic(topicName); + admin2.topics().deletePartitionedTopic(topicName); + } + + /** + * TODO next PR will correct the behavior below, just left this test here. + */ + // @Test + private void testNamespaceLevelReplicationRemoteConflictTopicExist() throws Exception { Review Comment: Removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org