This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 814f2ec9612 [fix][misc] do not require encryption on system topics 
(#18898)
814f2ec9612 is described below

commit 814f2ec961263e594c71a86e6e1ddb55a5cb24da
Author: Nicolò Boschi <[email protected]>
AuthorDate: Wed Dec 14 14:07:15 2022 +0100

    [fix][misc] do not require encryption on system topics (#18898)
    
    (cherry picked from commit 4129583c418dd68f8303dee601132e2910cdf8e6)
---
 .../apache/pulsar/broker/service/ServerCnx.java    |  5 ++++-
 .../broker/service/persistent/SystemTopic.java     |  6 ++++++
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 15 ++++++++++++-
 .../pulsar/broker/transaction/TransactionTest.java | 25 ++++++++++++++++++++++
 4 files changed, 49 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 3d221c88775..af0b1ca17e1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -137,6 +137,7 @@ import org.apache.pulsar.common.api.proto.TxnAction;
 import org.apache.pulsar.common.intercept.InterceptException;
 import org.apache.pulsar.common.naming.Metadata;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
@@ -1318,7 +1319,9 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
                 backlogQuotaCheckFuture.thenRun(() -> {
                     // Check whether the producer will publish encrypted 
messages or not
-                    if ((topic.isEncryptionRequired() || 
encryptionRequireOnProducer) && !isEncrypted) {
+                    if ((topic.isEncryptionRequired() || 
encryptionRequireOnProducer)
+                            && !isEncrypted
+                            && !SystemTopicNames.isSystemTopic(topicName)) {
                         String msg = String.format("Encryption is required in 
%s", topicName);
                         log.warn("[{}] {}", remoteAddress, msg);
                         if (producerFuture.completeExceptionally(new 
ServerMetadataException(msg))) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
index 715a684902f..1a7cf208382 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
@@ -77,4 +77,10 @@ public class SystemTopic extends PersistentTopic {
         // even though is not explicitly set in the policies.
         return !NamespaceService.isHeartbeatNamespace(TopicName.get(topic));
     }
+
+    @Override
+    public boolean isEncryptionRequired() {
+        // System topics are only written by the broker that can't know the 
encryption context.
+        return false;
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 4eb76b72768..ef458a6a2d0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.admin;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
@@ -3060,5 +3061,17 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
         // chunk message send success
         producer.send(new byte[2000]);
     }
-
+       
+    @Test
+    public void testProduceChangesWithEncryptionRequired() throws Exception {
+        final String beforeLac = 
admin.topics().getInternalStats(topicPolicyEventsTopic).lastConfirmedEntry;
+        admin.namespaces().setEncryptionRequiredStatus(myNamespace, true);
+        // just an update to trigger writes on __change_events
+        admin.topicPolicies().setMaxConsumers(testTopic, 5);
+        Awaitility.await()
+                .untilAsserted(() -> {
+                    final PersistentTopicInternalStats newLac = 
admin.topics().getInternalStats(topicPolicyEventsTopic);
+                    assertNotEquals(newLac, beforeLac);
+                });
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index f0417575446..76b91801a73 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -40,6 +40,7 @@ import io.netty.util.Timeout;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -1429,4 +1430,28 @@ public class TransactionTest extends TransactionTestBase 
{
             Assert.assertTrue(t instanceof 
BrokerServiceException.ServiceUnitNotReadyException);
         }
     }
+
+    @Test
+    public void testEncryptionRequired() throws Exception {
+        final String namespace = "tnx/ns-prechecks";
+        final String topic = "persistent://" + namespace + 
"/test_transaction_topic";
+        admin.namespaces().createNamespace(namespace);
+        admin.namespaces().setEncryptionRequiredStatus(namespace, true);
+        admin.topics().createNonPartitionedTopic(topic);
+
+        @Cleanup
+        Producer<byte[]> producer = this.pulsarClient.newProducer()
+                .topic(topic)
+                .sendTimeout(5, TimeUnit.SECONDS)
+                .addEncryptionKey("my-app-key")
+                
.defaultCryptoKeyReader("file:./src/test/resources/certificate/public-key.client-rsa.pem")
+                .create();
+
+        Transaction txn = pulsarClient.newTransaction()
+                .withTransactionTimeout(5, TimeUnit.SECONDS).build().get();
+        producer.newMessage(txn)
+                    
.value(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8))
+                    .send();
+        txn.commit();
+    }
 }

Reply via email to