This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 5f38eaff282 [fix][broker] partitioned __change_events topic is policy 
topic (#20392)
5f38eaff282 is described below

commit 5f38eaff282c890d19902e02adc512e8bfb2a788
Author: Michael Marshall <mmarsh...@apache.org>
AuthorDate: Thu May 25 03:46:31 2023 -0500

    [fix][broker] partitioned __change_events topic is policy topic (#20392)
    
    (cherry picked from commit 9918bced4465e0b0746a7959550c90cb76ae945f)
---
 .../broker/service/persistent/PersistentTopic.java |  2 +-
 .../pulsar/common/naming/SystemTopicNames.java     |  6 +--
 .../pulsar/common/naming/SystemTopicNamesTest.java | 47 ++++++++++++++++++++++
 3 files changed, 51 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 431be4d0811..22cd0b7d123 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -712,7 +712,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             }
 
             try {
-                if 
(!topic.endsWith(SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)
+                if (!SystemTopicNames.isTopicPoliciesSystemTopic(topic)
                         && !checkSubscriptionTypesEnable(subType)) {
                     return FutureUtil.failedFuture(
                             new NotAllowedException("Topic[{" + topic + "}] 
doesn't support "
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
index eaab8261460..dd78ed804f6 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
@@ -71,7 +71,7 @@ public class SystemTopicNames {
         if (topic == null) {
             return false;
         }
-        return 
TopicName.get(topic).getLocalName().equals(NAMESPACE_EVENTS_LOCAL_NAME);
+        return 
TopicName.getPartitionedTopicName(topic).getLocalName().equals(NAMESPACE_EVENTS_LOCAL_NAME);
     }
 
     public static boolean isTransactionInternalName(TopicName topicName) {
@@ -82,7 +82,7 @@ public class SystemTopicNames {
     }
 
     public static boolean isSystemTopic(TopicName topicName) {
-        TopicName nonePartitionedTopicName = 
TopicName.get(topicName.getPartitionedTopicName());
-        return isEventSystemTopic(nonePartitionedTopicName) || 
isTransactionInternalName(nonePartitionedTopicName);
+        TopicName nonPartitionedTopicName = 
TopicName.get(topicName.getPartitionedTopicName());
+        return isEventSystemTopic(nonPartitionedTopicName) || 
isTransactionInternalName(nonPartitionedTopicName);
     }
 }
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/SystemTopicNamesTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/SystemTopicNamesTest.java
new file mode 100644
index 00000000000..f919ab02b54
--- /dev/null
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/SystemTopicNamesTest.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.naming;
+
+import static org.testng.AssertJUnit.assertEquals;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Test
+public class SystemTopicNamesTest {
+
+    @DataProvider(name = "topicPoliciesSystemTopicNames")
+    public static Object[][] topicPoliciesSystemTopicNames() {
+        return new Object[][] {
+                {"persistent://public/default/__change_events", true},
+                {"persistent://public/default/__change_events-partition-0", 
true},
+                {"persistent://random-tenant/random-ns/__change_events", true},
+                
{"persistent://random-tenant/random-ns/__change_events-partition-1", true},
+                {"persistent://public/default/not_really__change_events", 
false},
+                {"persistent://public/default/__change_events-diff-suffix", 
false},
+                {"persistent://a/b/not_really__change_events", false},
+        };
+    }
+
+    @Test(dataProvider = "topicPoliciesSystemTopicNames")
+    public void testIsTopicPoliciesSystemTopic(String topicName, boolean 
expectedResult) {
+        assertEquals(expectedResult, 
SystemTopicNames.isTopicPoliciesSystemTopic(topicName));
+        assertEquals(expectedResult, 
SystemTopicNames.isSystemTopic(TopicName.get(topicName)));
+        assertEquals(expectedResult, 
SystemTopicNames.isEventSystemTopic(TopicName.get(topicName)));
+    }
+}

Reply via email to