This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4129583c418 [fix][misc] do not require encryption on system topics 
(#18898)
4129583c418 is described below

commit 4129583c418dd68f8303dee601132e2910cdf8e6
Author: Nicolò Boschi <[email protected]>
AuthorDate: Wed Dec 14 14:07:15 2022 +0100

    [fix][misc] do not require encryption on system topics (#18898)
---
 .../apache/pulsar/broker/service/ServerCnx.java    |  5 ++++-
 .../broker/service/persistent/SystemTopic.java     |  6 ++++++
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 14 ++++++++++++
 .../pulsar/broker/transaction/TransactionTest.java | 25 ++++++++++++++++++++++
 4 files changed, 49 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 9e4304ff6e8..d8e4f517690 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -142,6 +142,7 @@ import org.apache.pulsar.common.api.proto.TxnAction;
 import org.apache.pulsar.common.intercept.InterceptException;
 import org.apache.pulsar.common.naming.Metadata;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
@@ -1326,7 +1327,9 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
                 backlogQuotaCheckFuture.thenRun(() -> {
                     // Check whether the producer will publish encrypted 
messages or not
-                    if ((topic.isEncryptionRequired() || 
encryptionRequireOnProducer) && !isEncrypted) {
+                    if ((topic.isEncryptionRequired() || 
encryptionRequireOnProducer)
+                            && !isEncrypted
+                            && !SystemTopicNames.isSystemTopic(topicName)) {
                         String msg = String.format("Encryption is required in 
%s", topicName);
                         log.warn("[{}] {}", remoteAddress, msg);
                         if (producerFuture.completeExceptionally(new 
ServerMetadataException(msg))) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
index 4a9d9b0b2d5..395a8c9075e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
@@ -76,4 +76,10 @@ public class SystemTopic extends PersistentTopic {
         // even though is not explicitly set in the policies.
         return !NamespaceService.isHeartbeatNamespace(TopicName.get(topic));
     }
+
+    @Override
+    public boolean isEncryptionRequired() {
+        // System topics are only written by the broker that can't know the 
encryption context.
+        return false;
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 94cd299e475..db26f250067 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.admin;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
@@ -3109,4 +3110,17 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         assertNull(topicPolicies);
     }
 
+    @Test
+    public void testProduceChangesWithEncryptionRequired() throws Exception {
+        final String beforeLac = 
admin.topics().getInternalStats(topicPolicyEventsTopic).lastConfirmedEntry;
+        admin.namespaces().setEncryptionRequiredStatus(myNamespace, true);
+        // just an update to trigger writes on __change_events
+        admin.topicPolicies().setMaxConsumers(testTopic, 5);
+        Awaitility.await()
+                .untilAsserted(() -> {
+                    final PersistentTopicInternalStats newLac = 
admin.topics().getInternalStats(topicPolicyEventsTopic);
+                    assertNotEquals(newLac, beforeLac);
+                });
+    }
+
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index f075b80ef7a..c237b5024c4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -1617,4 +1617,29 @@ public class TransactionTest extends TransactionTestBase 
{
         Transaction abortingTxn = transaction;
         Awaitility.await().until(() -> abortingTxn.getState() == 
Transaction.State.ABORTING);
     }
+
+
+    @Test
+    public void testEncryptionRequired() throws Exception {
+        final String namespace = "tnx/ns-prechecks";
+        final String topic = "persistent://" + namespace + 
"/test_transaction_topic";
+        admin.namespaces().createNamespace(namespace);
+        admin.namespaces().setEncryptionRequiredStatus(namespace, true);
+        admin.topics().createNonPartitionedTopic(topic);
+
+        @Cleanup
+        Producer<byte[]> producer = this.pulsarClient.newProducer()
+                .topic(topic)
+                .sendTimeout(5, TimeUnit.SECONDS)
+                .addEncryptionKey("my-app-key")
+                
.defaultCryptoKeyReader("file:./src/test/resources/certificate/public-key.client-rsa.pem")
+                .create();
+
+        Transaction txn = pulsarClient.newTransaction()
+                .withTransactionTimeout(5, TimeUnit.SECONDS).build().get();
+        producer.newMessage(txn)
+                    
.value(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8))
+                    .send();
+        txn.commit();
+    }
 }

Reply via email to