jbampton commented on a change in pull request #8962: URL: https://github.com/apache/pulsar/pull/8962#discussion_r543371155
########## File path: tests/pulsar-client-admin-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java ########## @@ -0,0 +1,577 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration; + +import lombok.Cleanup; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.*; +import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.client.impl.TopicMessageImpl; +import org.apache.pulsar.client.impl.crypto.MessageCryptoBc; +import org.apache.pulsar.common.api.EncryptionContext; +import org.apache.pulsar.common.api.proto.PulsarApi; +import org.apache.pulsar.common.compression.CompressionCodec; +import org.apache.pulsar.common.compression.CompressionCodecProvider; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.shade.com.google.common.collect.Maps; +import org.apache.pulsar.shade.com.google.common.collect.Sets; +import org.apache.pulsar.shade.io.netty.buffer.ByteBuf; +import org.apache.pulsar.shade.io.netty.buffer.Unpooled; +import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.security.Security; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.testng.Assert.*; + +public class SimpleProducerConsumerTest { + private static final Logger log = LoggerFactory.getLogger(SimpleProducerConsumerTest.class); + + private PulsarContainer pulsarContainer; + private URI lookupUrl; + private PulsarClient pulsarClient; + + @BeforeClass + public void setup() throws PulsarClientException, URISyntaxException, PulsarAdminException { + Security.addProvider(new org.bouncycastle.jce.provider.BouncyCastleProvider()); + + pulsarContainer = new PulsarContainer(); + pulsarContainer.start(); + pulsarClient = PulsarClient.builder() + .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl()) + .build(); + lookupUrl = new URI(pulsarContainer.getPlainTextPulsarBrokerUrl()); + + PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarContainer.getPulsarAdminUrl()).build(); + admin.tenants().createTenant("my-property", + new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("standalone"))); + admin.namespaces().createNamespace("my-property/my-ns"); + admin.namespaces().setNamespaceReplicationClusters("my-property/my-ns", Sets.newHashSet("standalone")); + admin.close(); + } + + @AfterClass + public void cleanup() throws PulsarClientException { + pulsarClient.close(); + pulsarContainer.stop(); + pulsarContainer.close(); + } + + private PulsarClient newPulsarClient(String url, int intervalInSecs) throws PulsarClientException { + return PulsarClient.builder().serviceUrl(url).statsInterval(intervalInSecs, TimeUnit.SECONDS).build(); + } + + @Test + public void testRSAEncryption() throws Exception { + + String topicName = "persistent://my-property/my-ns/myrsa-topic1-"+ System.currentTimeMillis(); Review comment: ```suggestion String topicName = "persistent://my-property/my-ns/myrsa-topic1-" + System.currentTimeMillis(); ``` ########## File path: tests/pulsar-client-admin-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java ########## @@ -0,0 +1,577 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration; + +import lombok.Cleanup; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.*; +import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.client.impl.TopicMessageImpl; +import org.apache.pulsar.client.impl.crypto.MessageCryptoBc; +import org.apache.pulsar.common.api.EncryptionContext; +import org.apache.pulsar.common.api.proto.PulsarApi; +import org.apache.pulsar.common.compression.CompressionCodec; +import org.apache.pulsar.common.compression.CompressionCodecProvider; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.shade.com.google.common.collect.Maps; +import org.apache.pulsar.shade.com.google.common.collect.Sets; +import org.apache.pulsar.shade.io.netty.buffer.ByteBuf; +import org.apache.pulsar.shade.io.netty.buffer.Unpooled; +import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.security.Security; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.testng.Assert.*; + +public class SimpleProducerConsumerTest { + private static final Logger log = LoggerFactory.getLogger(SimpleProducerConsumerTest.class); + + private PulsarContainer pulsarContainer; + private URI lookupUrl; + private PulsarClient pulsarClient; + + @BeforeClass + public void setup() throws PulsarClientException, URISyntaxException, PulsarAdminException { + Security.addProvider(new org.bouncycastle.jce.provider.BouncyCastleProvider()); + + pulsarContainer = new PulsarContainer(); + pulsarContainer.start(); + pulsarClient = PulsarClient.builder() + .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl()) + .build(); + lookupUrl = new URI(pulsarContainer.getPlainTextPulsarBrokerUrl()); + + PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarContainer.getPulsarAdminUrl()).build(); + admin.tenants().createTenant("my-property", + new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("standalone"))); + admin.namespaces().createNamespace("my-property/my-ns"); + admin.namespaces().setNamespaceReplicationClusters("my-property/my-ns", Sets.newHashSet("standalone")); + admin.close(); + } + + @AfterClass + public void cleanup() throws PulsarClientException { + pulsarClient.close(); + pulsarContainer.stop(); + pulsarContainer.close(); + } + + private PulsarClient newPulsarClient(String url, int intervalInSecs) throws PulsarClientException { + return PulsarClient.builder().serviceUrl(url).statsInterval(intervalInSecs, TimeUnit.SECONDS).build(); + } + + @Test + public void testRSAEncryption() throws Exception { + + String topicName = "persistent://my-property/my-ns/myrsa-topic1-"+ System.currentTimeMillis(); + + class EncKeyReader implements CryptoKeyReader { + + EncryptionKeyInfo keyInfo = new EncryptionKeyInfo(); + + @Override + public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) { + String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName; + if (Files.isReadable(Paths.get(CERT_FILE_PATH))) { + try { + keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH))); + return keyInfo; + } catch (IOException e) { + Assert.fail("Failed to read certificate from " + CERT_FILE_PATH); + } + } else { + Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable."); + } + return null; + } + + @Override + public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) { + String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName; + if (Files.isReadable(Paths.get(CERT_FILE_PATH))) { + try { + keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH))); + return keyInfo; + } catch (IOException e) { + Assert.fail("Failed to read certificate from " + CERT_FILE_PATH); + } + } else { + Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable."); + } + return null; + } + } + + final int totalMsg = 10; + + Set<String> messageSet = Sets.newHashSet(); + Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/myrsa-topic1") + .subscriptionName("my-subscriber-name").cryptoKeyReader(new EncKeyReader()).subscribe(); + Consumer<byte[]> normalConsumer = pulsarClient.newConsumer() + .topic(topicName).subscriptionName("my-subscriber-name-normal") + .subscribe(); + + Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1") + .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new EncKeyReader()).create(); + Producer<byte[]> producer2 = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1") + .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new EncKeyReader()).create(); + + for (int i = 0; i < totalMsg; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + } + for (int i = totalMsg; i < totalMsg * 2; i++) { + String message = "my-message-" + i; + producer2.send(message.getBytes()); + } + + MessageImpl<byte[]> msg = null; + + msg = (MessageImpl<byte[]>) normalConsumer.receive(500, TimeUnit.MILLISECONDS); + // should not able to read message using normal message. + assertNull(msg); + + for (int i = 0; i < totalMsg * 2; i++) { + msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS); + // verify that encrypted message contains encryption-context + msg.getEncryptionCtx() + .orElseThrow(() -> new IllegalStateException("encryption-ctx not present for encrypted message")); + String receivedMessage = new String(msg.getData()); + log.debug("Received message: [{}]", receivedMessage); + String expectedMessage = "my-message-" + i; + testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); + } + // Acknowledge the consumption of all messages at once + consumer.acknowledgeCumulative(msg); + consumer.close(); + } + + protected <T> void testMessageOrderAndDuplicates(Set<T> messagesReceived, T receivedMessage, + T expectedMessage) { + // Make sure that messages are received in order + Assert.assertEquals(receivedMessage, expectedMessage, + "Received message " + receivedMessage + " did not match the expected message " + expectedMessage); + + // Make sure that there are no duplicates + Assert.assertTrue(messagesReceived.add(receivedMessage), "Received duplicate message " + receivedMessage); + } + + @Test + public void testRedeliveryOfFailedMessages() throws Exception { + + @Cleanup + PulsarClient pulsarClient = PulsarClient.builder() + .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl()) + .build(); + + final String encryptionKeyName = "client-rsa.pem"; + final String encryptionKeyVersion = "1.0"; + Map<String, String> metadata = Maps.newHashMap(); + metadata.put("version", encryptionKeyVersion); + class EncKeyReader implements CryptoKeyReader { + EncryptionKeyInfo keyInfo = new EncryptionKeyInfo(); + + @Override + public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) { + String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName; + if (Files.isReadable(Paths.get(CERT_FILE_PATH))) { + try { + keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH))); + keyInfo.setMetadata(metadata); + return keyInfo; + } catch (IOException e) { + Assert.fail("Failed to read certificate from " + CERT_FILE_PATH); + } + } else { + Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable."); + } + return null; + } + + @Override + public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) { + String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName; + if (Files.isReadable(Paths.get(CERT_FILE_PATH))) { + try { + keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH))); + keyInfo.setMetadata(metadata); + return keyInfo; + } catch (IOException e) { + Assert.fail("Failed to read certificate from " + CERT_FILE_PATH); + } + } else { + Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable."); + } + return null; + } + } + + class InvalidKeyReader implements CryptoKeyReader { + EncryptionKeyInfo keyInfo = new EncryptionKeyInfo(); + + @Override + public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) { + return null; + } + + @Override + public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> metadata) { + return null; + } + } + + /* + * Redelivery functionality guarantees that customer will get a chance to process the message again. + * In case of shared subscription eventually every client will get a chance to process the message, till one of them acks it. + * + * For client with Encryption enabled where in cases like a new production rollout or a buggy client configuration, we might have a mismatch of consumers + * - few which can decrypt, few which can't (due to errors or cryptoReader not configured). + * + * In that case eventually all messages should be acked as long as there is a single consumer who can decrypt the message. + * + * Consumer 1 - Can decrypt message + * Consumer 2 - Has invalid Reader configured. + * Consumer 3 - Has no reader configured. + * + */ + + String topicName = "persistent://my-property/my-ns/myrsa-topic2"; + + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4) + .cryptoKeyReader(new EncKeyReader()).create(); + + PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection + Consumer<byte[]> consumer1 = newPulsarClient.newConsumer().topicsPattern(topicName) + .subscriptionName("my-subscriber-name").cryptoKeyReader(new EncKeyReader()) + .subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe(); + + PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection + Consumer<byte[]> consumer2 = newPulsarClient1.newConsumer().topicsPattern(topicName) + .subscriptionName("my-subscriber-name").cryptoKeyReader(new InvalidKeyReader()) + .subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe(); + + PulsarClient newPulsarClient2 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection + Consumer<byte[]> consumer3 = newPulsarClient2.newConsumer().topicsPattern(topicName) + .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe(); + + int numberOfMessages = 100; + String message = "my-message"; + Set<String> messages = new HashSet(); // Since messages are in random order Review comment: ```suggestion Set<String> messages = new HashSet();// Since messages are in random order ``` ########## File path: tests/pulsar-client-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java ########## @@ -0,0 +1,576 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration; + +import lombok.Cleanup; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.*; +import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.client.impl.TopicMessageImpl; +import org.apache.pulsar.client.impl.crypto.MessageCryptoBc; +import org.apache.pulsar.common.api.EncryptionContext; +import org.apache.pulsar.common.api.proto.PulsarApi; +import org.apache.pulsar.common.compression.CompressionCodec; +import org.apache.pulsar.common.compression.CompressionCodecProvider; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.shade.com.google.common.collect.Maps; +import org.apache.pulsar.shade.com.google.common.collect.Sets; +import org.apache.pulsar.shade.io.netty.buffer.ByteBuf; +import org.apache.pulsar.shade.io.netty.buffer.Unpooled; +import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.security.Security; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.testng.Assert.*; + +public class SimpleProducerConsumerTest { + private static final Logger log = LoggerFactory.getLogger(SimpleProducerConsumerTest.class); + + private PulsarContainer pulsarContainer; + private URI lookupUrl; + private PulsarClient pulsarClient; + + @BeforeClass + public void setup() throws PulsarClientException, URISyntaxException, PulsarAdminException { + Security.addProvider(new org.bouncycastle.jce.provider.BouncyCastleProvider()); + pulsarContainer = new PulsarContainer(); + pulsarContainer.start(); + pulsarClient = PulsarClient.builder() + .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl()) + .build(); + lookupUrl = new URI(pulsarContainer.getPlainTextPulsarBrokerUrl()); + + PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarContainer.getPulsarAdminUrl()).build(); + admin.tenants().createTenant("my-property", + new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("standalone"))); + admin.namespaces().createNamespace("my-property/my-ns"); + admin.namespaces().setNamespaceReplicationClusters("my-property/my-ns", Sets.newHashSet("standalone")); + admin.close(); + } + + @AfterClass + public void cleanup() throws PulsarClientException { + pulsarClient.close(); + pulsarContainer.stop(); + pulsarContainer.close(); + } + + private PulsarClient newPulsarClient(String url, int intervalInSecs) throws PulsarClientException { + return PulsarClient.builder().serviceUrl(url).statsInterval(intervalInSecs, TimeUnit.SECONDS).build(); + } + + @Test + public void testRSAEncryption() throws Exception { + + String topicName = "persistent://my-property/my-ns/myrsa-topic1-"+ System.currentTimeMillis(); + + class EncKeyReader implements CryptoKeyReader { + + EncryptionKeyInfo keyInfo = new EncryptionKeyInfo(); + + @Override + public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) { + String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName; + if (Files.isReadable(Paths.get(CERT_FILE_PATH))) { + try { + keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH))); + return keyInfo; + } catch (IOException e) { + Assert.fail("Failed to read certificate from " + CERT_FILE_PATH); + } + } else { + Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable."); + } + return null; + } + + @Override + public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) { + String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName; + if (Files.isReadable(Paths.get(CERT_FILE_PATH))) { + try { + keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH))); + return keyInfo; + } catch (IOException e) { + Assert.fail("Failed to read certificate from " + CERT_FILE_PATH); + } + } else { + Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable."); + } + return null; + } + } + + final int totalMsg = 10; + + Set<String> messageSet = Sets.newHashSet(); + Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/myrsa-topic1") + .subscriptionName("my-subscriber-name").cryptoKeyReader(new EncKeyReader()).subscribe(); + Consumer<byte[]> normalConsumer = pulsarClient.newConsumer() + .topic(topicName).subscriptionName("my-subscriber-name-normal") + .subscribe(); + + Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1") + .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new EncKeyReader()).create(); + Producer<byte[]> producer2 = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1") + .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new EncKeyReader()).create(); + + for (int i = 0; i < totalMsg; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + } + for (int i = totalMsg; i < totalMsg * 2; i++) { + String message = "my-message-" + i; + producer2.send(message.getBytes()); + } + + MessageImpl<byte[]> msg = null; + + msg = (MessageImpl<byte[]>) normalConsumer.receive(500, TimeUnit.MILLISECONDS); + // should not able to read message using normal message. + assertNull(msg); + + for (int i = 0; i < totalMsg * 2; i++) { + msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS); + // verify that encrypted message contains encryption-context + msg.getEncryptionCtx() + .orElseThrow(() -> new IllegalStateException("encryption-ctx not present for encrypted message")); + String receivedMessage = new String(msg.getData()); + log.debug("Received message: [{}]", receivedMessage); + String expectedMessage = "my-message-" + i; + testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); + } + // Acknowledge the consumption of all messages at once + consumer.acknowledgeCumulative(msg); + consumer.close(); + } + + protected <T> void testMessageOrderAndDuplicates(Set<T> messagesReceived, T receivedMessage, + T expectedMessage) { + // Make sure that messages are received in order + Assert.assertEquals(receivedMessage, expectedMessage, + "Received message " + receivedMessage + " did not match the expected message " + expectedMessage); + + // Make sure that there are no duplicates + Assert.assertTrue(messagesReceived.add(receivedMessage), "Received duplicate message " + receivedMessage); + } + + @Test + public void testRedeliveryOfFailedMessages() throws Exception { + + @Cleanup + PulsarClient pulsarClient = PulsarClient.builder() + .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl()) + .build(); + + final String encryptionKeyName = "client-rsa.pem"; + final String encryptionKeyVersion = "1.0"; + Map<String, String> metadata = Maps.newHashMap(); + metadata.put("version", encryptionKeyVersion); + class EncKeyReader implements CryptoKeyReader { + EncryptionKeyInfo keyInfo = new EncryptionKeyInfo(); + + @Override + public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) { + String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName; + if (Files.isReadable(Paths.get(CERT_FILE_PATH))) { + try { + keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH))); + keyInfo.setMetadata(metadata); + return keyInfo; + } catch (IOException e) { + Assert.fail("Failed to read certificate from " + CERT_FILE_PATH); + } + } else { + Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable."); + } + return null; + } + + @Override + public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) { + String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName; + if (Files.isReadable(Paths.get(CERT_FILE_PATH))) { + try { + keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH))); + keyInfo.setMetadata(metadata); + return keyInfo; + } catch (IOException e) { + Assert.fail("Failed to read certificate from " + CERT_FILE_PATH); + } + } else { + Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable."); + } + return null; + } + } + + class InvalidKeyReader implements CryptoKeyReader { + EncryptionKeyInfo keyInfo = new EncryptionKeyInfo(); + + @Override + public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) { + return null; + } + + @Override + public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> metadata) { + return null; + } + } + + /* + * Redelivery functionality guarantees that customer will get a chance to process the message again. + * In case of shared subscription eventually every client will get a chance to process the message, till one of them acks it. + * + * For client with Encryption enabled where in cases like a new production rollout or a buggy client configuration, we might have a mismatch of consumers + * - few which can decrypt, few which can't (due to errors or cryptoReader not configured). + * + * In that case eventually all messages should be acked as long as there is a single consumer who can decrypt the message. + * + * Consumer 1 - Can decrypt message + * Consumer 2 - Has invalid Reader configured. + * Consumer 3 - Has no reader configured. + * + */ + + String topicName = "persistent://my-property/my-ns/myrsa-topic2"; + + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4) + .cryptoKeyReader(new EncKeyReader()).create(); + + PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection + Consumer<byte[]> consumer1 = newPulsarClient.newConsumer().topicsPattern(topicName) + .subscriptionName("my-subscriber-name").cryptoKeyReader(new EncKeyReader()) + .subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe(); + + PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection + Consumer<byte[]> consumer2 = newPulsarClient1.newConsumer().topicsPattern(topicName) + .subscriptionName("my-subscriber-name").cryptoKeyReader(new InvalidKeyReader()) + .subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe(); + + PulsarClient newPulsarClient2 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection + Consumer<byte[]> consumer3 = newPulsarClient2.newConsumer().topicsPattern(topicName) + .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe(); + + int numberOfMessages = 100; + String message = "my-message"; + Set<String> messages = new HashSet(); // Since messages are in random order + for (int i = 0; i<numberOfMessages; i++) { + producer.send((message + i).getBytes()); + } + + // Consuming from consumer 2 and 3 + // no message should be returned since they can't decrypt the message + Message m = consumer2.receive(3, TimeUnit.SECONDS); + assertNull(m); + m = consumer3.receive(3, TimeUnit.SECONDS); + assertNull(m); + + for (int i = 0; i<numberOfMessages; i++) { Review comment: ```suggestion for (int i = 0; i < numberOfMessages; i++) { ``` ########## File path: tests/pulsar-client-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java ########## @@ -0,0 +1,576 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration; + +import lombok.Cleanup; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.*; +import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.client.impl.TopicMessageImpl; +import org.apache.pulsar.client.impl.crypto.MessageCryptoBc; +import org.apache.pulsar.common.api.EncryptionContext; +import org.apache.pulsar.common.api.proto.PulsarApi; +import org.apache.pulsar.common.compression.CompressionCodec; +import org.apache.pulsar.common.compression.CompressionCodecProvider; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.shade.com.google.common.collect.Maps; +import org.apache.pulsar.shade.com.google.common.collect.Sets; +import org.apache.pulsar.shade.io.netty.buffer.ByteBuf; +import org.apache.pulsar.shade.io.netty.buffer.Unpooled; +import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.security.Security; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.testng.Assert.*; + +public class SimpleProducerConsumerTest { + private static final Logger log = LoggerFactory.getLogger(SimpleProducerConsumerTest.class); + + private PulsarContainer pulsarContainer; + private URI lookupUrl; + private PulsarClient pulsarClient; + + @BeforeClass + public void setup() throws PulsarClientException, URISyntaxException, PulsarAdminException { + Security.addProvider(new org.bouncycastle.jce.provider.BouncyCastleProvider()); + pulsarContainer = new PulsarContainer(); + pulsarContainer.start(); + pulsarClient = PulsarClient.builder() + .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl()) + .build(); + lookupUrl = new URI(pulsarContainer.getPlainTextPulsarBrokerUrl()); + + PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarContainer.getPulsarAdminUrl()).build(); + admin.tenants().createTenant("my-property", + new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("standalone"))); + admin.namespaces().createNamespace("my-property/my-ns"); + admin.namespaces().setNamespaceReplicationClusters("my-property/my-ns", Sets.newHashSet("standalone")); + admin.close(); + } + + @AfterClass + public void cleanup() throws PulsarClientException { + pulsarClient.close(); + pulsarContainer.stop(); + pulsarContainer.close(); + } + + private PulsarClient newPulsarClient(String url, int intervalInSecs) throws PulsarClientException { + return PulsarClient.builder().serviceUrl(url).statsInterval(intervalInSecs, TimeUnit.SECONDS).build(); + } + + @Test + public void testRSAEncryption() throws Exception { + + String topicName = "persistent://my-property/my-ns/myrsa-topic1-"+ System.currentTimeMillis(); + + class EncKeyReader implements CryptoKeyReader { + + EncryptionKeyInfo keyInfo = new EncryptionKeyInfo(); + + @Override + public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) { + String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName; + if (Files.isReadable(Paths.get(CERT_FILE_PATH))) { + try { + keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH))); + return keyInfo; + } catch (IOException e) { + Assert.fail("Failed to read certificate from " + CERT_FILE_PATH); + } + } else { + Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable."); + } + return null; + } + + @Override + public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) { + String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName; + if (Files.isReadable(Paths.get(CERT_FILE_PATH))) { + try { + keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH))); + return keyInfo; + } catch (IOException e) { + Assert.fail("Failed to read certificate from " + CERT_FILE_PATH); + } + } else { + Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable."); + } + return null; + } + } + + final int totalMsg = 10; + + Set<String> messageSet = Sets.newHashSet(); + Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/myrsa-topic1") + .subscriptionName("my-subscriber-name").cryptoKeyReader(new EncKeyReader()).subscribe(); + Consumer<byte[]> normalConsumer = pulsarClient.newConsumer() + .topic(topicName).subscriptionName("my-subscriber-name-normal") + .subscribe(); + + Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1") + .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new EncKeyReader()).create(); + Producer<byte[]> producer2 = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1") + .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new EncKeyReader()).create(); + + for (int i = 0; i < totalMsg; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + } + for (int i = totalMsg; i < totalMsg * 2; i++) { + String message = "my-message-" + i; + producer2.send(message.getBytes()); + } + + MessageImpl<byte[]> msg = null; + + msg = (MessageImpl<byte[]>) normalConsumer.receive(500, TimeUnit.MILLISECONDS); + // should not able to read message using normal message. + assertNull(msg); + + for (int i = 0; i < totalMsg * 2; i++) { + msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS); + // verify that encrypted message contains encryption-context + msg.getEncryptionCtx() + .orElseThrow(() -> new IllegalStateException("encryption-ctx not present for encrypted message")); + String receivedMessage = new String(msg.getData()); + log.debug("Received message: [{}]", receivedMessage); + String expectedMessage = "my-message-" + i; + testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); + } + // Acknowledge the consumption of all messages at once + consumer.acknowledgeCumulative(msg); + consumer.close(); + } + + protected <T> void testMessageOrderAndDuplicates(Set<T> messagesReceived, T receivedMessage, + T expectedMessage) { + // Make sure that messages are received in order + Assert.assertEquals(receivedMessage, expectedMessage, + "Received message " + receivedMessage + " did not match the expected message " + expectedMessage); + + // Make sure that there are no duplicates + Assert.assertTrue(messagesReceived.add(receivedMessage), "Received duplicate message " + receivedMessage); + } + + @Test + public void testRedeliveryOfFailedMessages() throws Exception { + + @Cleanup + PulsarClient pulsarClient = PulsarClient.builder() + .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl()) + .build(); + + final String encryptionKeyName = "client-rsa.pem"; + final String encryptionKeyVersion = "1.0"; + Map<String, String> metadata = Maps.newHashMap(); + metadata.put("version", encryptionKeyVersion); + class EncKeyReader implements CryptoKeyReader { + EncryptionKeyInfo keyInfo = new EncryptionKeyInfo(); + + @Override + public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) { + String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName; + if (Files.isReadable(Paths.get(CERT_FILE_PATH))) { + try { + keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH))); + keyInfo.setMetadata(metadata); + return keyInfo; + } catch (IOException e) { + Assert.fail("Failed to read certificate from " + CERT_FILE_PATH); + } + } else { + Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable."); + } + return null; + } + + @Override + public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) { + String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName; + if (Files.isReadable(Paths.get(CERT_FILE_PATH))) { + try { + keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH))); + keyInfo.setMetadata(metadata); + return keyInfo; + } catch (IOException e) { + Assert.fail("Failed to read certificate from " + CERT_FILE_PATH); + } + } else { + Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable."); + } + return null; + } + } + + class InvalidKeyReader implements CryptoKeyReader { + EncryptionKeyInfo keyInfo = new EncryptionKeyInfo(); + + @Override + public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) { + return null; + } + + @Override + public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> metadata) { + return null; + } + } + + /* + * Redelivery functionality guarantees that customer will get a chance to process the message again. + * In case of shared subscription eventually every client will get a chance to process the message, till one of them acks it. + * + * For client with Encryption enabled where in cases like a new production rollout or a buggy client configuration, we might have a mismatch of consumers + * - few which can decrypt, few which can't (due to errors or cryptoReader not configured). + * + * In that case eventually all messages should be acked as long as there is a single consumer who can decrypt the message. + * + * Consumer 1 - Can decrypt message + * Consumer 2 - Has invalid Reader configured. + * Consumer 3 - Has no reader configured. + * + */ + + String topicName = "persistent://my-property/my-ns/myrsa-topic2"; + + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4) + .cryptoKeyReader(new EncKeyReader()).create(); + + PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection + Consumer<byte[]> consumer1 = newPulsarClient.newConsumer().topicsPattern(topicName) + .subscriptionName("my-subscriber-name").cryptoKeyReader(new EncKeyReader()) + .subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe(); + + PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection + Consumer<byte[]> consumer2 = newPulsarClient1.newConsumer().topicsPattern(topicName) + .subscriptionName("my-subscriber-name").cryptoKeyReader(new InvalidKeyReader()) + .subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe(); + + PulsarClient newPulsarClient2 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection + Consumer<byte[]> consumer3 = newPulsarClient2.newConsumer().topicsPattern(topicName) + .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe(); + + int numberOfMessages = 100; + String message = "my-message"; + Set<String> messages = new HashSet(); // Since messages are in random order Review comment: ```suggestion Set<String> messages = new HashSet();// Since messages are in random order ``` ########## File path: tests/pulsar-client-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java ########## @@ -0,0 +1,576 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration; + +import lombok.Cleanup; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.*; +import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.client.impl.TopicMessageImpl; +import org.apache.pulsar.client.impl.crypto.MessageCryptoBc; +import org.apache.pulsar.common.api.EncryptionContext; +import org.apache.pulsar.common.api.proto.PulsarApi; +import org.apache.pulsar.common.compression.CompressionCodec; +import org.apache.pulsar.common.compression.CompressionCodecProvider; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.shade.com.google.common.collect.Maps; +import org.apache.pulsar.shade.com.google.common.collect.Sets; +import org.apache.pulsar.shade.io.netty.buffer.ByteBuf; +import org.apache.pulsar.shade.io.netty.buffer.Unpooled; +import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.security.Security; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.testng.Assert.*; + +public class SimpleProducerConsumerTest { + private static final Logger log = LoggerFactory.getLogger(SimpleProducerConsumerTest.class); + + private PulsarContainer pulsarContainer; + private URI lookupUrl; + private PulsarClient pulsarClient; + + @BeforeClass + public void setup() throws PulsarClientException, URISyntaxException, PulsarAdminException { + Security.addProvider(new org.bouncycastle.jce.provider.BouncyCastleProvider()); + pulsarContainer = new PulsarContainer(); + pulsarContainer.start(); + pulsarClient = PulsarClient.builder() + .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl()) + .build(); + lookupUrl = new URI(pulsarContainer.getPlainTextPulsarBrokerUrl()); + + PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarContainer.getPulsarAdminUrl()).build(); + admin.tenants().createTenant("my-property", + new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("standalone"))); + admin.namespaces().createNamespace("my-property/my-ns"); + admin.namespaces().setNamespaceReplicationClusters("my-property/my-ns", Sets.newHashSet("standalone")); + admin.close(); + } + + @AfterClass + public void cleanup() throws PulsarClientException { + pulsarClient.close(); + pulsarContainer.stop(); + pulsarContainer.close(); + } + + private PulsarClient newPulsarClient(String url, int intervalInSecs) throws PulsarClientException { + return PulsarClient.builder().serviceUrl(url).statsInterval(intervalInSecs, TimeUnit.SECONDS).build(); + } + + @Test + public void testRSAEncryption() throws Exception { + + String topicName = "persistent://my-property/my-ns/myrsa-topic1-"+ System.currentTimeMillis(); Review comment: ```suggestion String topicName = "persistent://my-property/my-ns/myrsa-topic1-" + System.currentTimeMillis(); ``` ########## File path: tests/pulsar-client-admin-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java ########## @@ -0,0 +1,577 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration; + +import lombok.Cleanup; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.*; +import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.client.impl.TopicMessageImpl; +import org.apache.pulsar.client.impl.crypto.MessageCryptoBc; +import org.apache.pulsar.common.api.EncryptionContext; +import org.apache.pulsar.common.api.proto.PulsarApi; +import org.apache.pulsar.common.compression.CompressionCodec; +import org.apache.pulsar.common.compression.CompressionCodecProvider; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.shade.com.google.common.collect.Maps; +import org.apache.pulsar.shade.com.google.common.collect.Sets; +import org.apache.pulsar.shade.io.netty.buffer.ByteBuf; +import org.apache.pulsar.shade.io.netty.buffer.Unpooled; +import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.security.Security; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.testng.Assert.*; + +public class SimpleProducerConsumerTest { + private static final Logger log = LoggerFactory.getLogger(SimpleProducerConsumerTest.class); + + private PulsarContainer pulsarContainer; + private URI lookupUrl; + private PulsarClient pulsarClient; + + @BeforeClass + public void setup() throws PulsarClientException, URISyntaxException, PulsarAdminException { + Security.addProvider(new org.bouncycastle.jce.provider.BouncyCastleProvider()); + + pulsarContainer = new PulsarContainer(); + pulsarContainer.start(); + pulsarClient = PulsarClient.builder() + .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl()) + .build(); + lookupUrl = new URI(pulsarContainer.getPlainTextPulsarBrokerUrl()); + + PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarContainer.getPulsarAdminUrl()).build(); + admin.tenants().createTenant("my-property", + new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("standalone"))); + admin.namespaces().createNamespace("my-property/my-ns"); + admin.namespaces().setNamespaceReplicationClusters("my-property/my-ns", Sets.newHashSet("standalone")); + admin.close(); + } + + @AfterClass + public void cleanup() throws PulsarClientException { + pulsarClient.close(); + pulsarContainer.stop(); + pulsarContainer.close(); + } + + private PulsarClient newPulsarClient(String url, int intervalInSecs) throws PulsarClientException { + return PulsarClient.builder().serviceUrl(url).statsInterval(intervalInSecs, TimeUnit.SECONDS).build(); + } + + @Test + public void testRSAEncryption() throws Exception { + + String topicName = "persistent://my-property/my-ns/myrsa-topic1-"+ System.currentTimeMillis(); + + class EncKeyReader implements CryptoKeyReader { + + EncryptionKeyInfo keyInfo = new EncryptionKeyInfo(); + + @Override + public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) { + String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName; + if (Files.isReadable(Paths.get(CERT_FILE_PATH))) { + try { + keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH))); + return keyInfo; + } catch (IOException e) { + Assert.fail("Failed to read certificate from " + CERT_FILE_PATH); + } + } else { + Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable."); + } + return null; + } + + @Override + public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) { + String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName; + if (Files.isReadable(Paths.get(CERT_FILE_PATH))) { + try { + keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH))); + return keyInfo; + } catch (IOException e) { + Assert.fail("Failed to read certificate from " + CERT_FILE_PATH); + } + } else { + Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable."); + } + return null; + } + } + + final int totalMsg = 10; + + Set<String> messageSet = Sets.newHashSet(); + Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/myrsa-topic1") + .subscriptionName("my-subscriber-name").cryptoKeyReader(new EncKeyReader()).subscribe(); + Consumer<byte[]> normalConsumer = pulsarClient.newConsumer() + .topic(topicName).subscriptionName("my-subscriber-name-normal") + .subscribe(); + + Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1") + .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new EncKeyReader()).create(); + Producer<byte[]> producer2 = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1") + .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new EncKeyReader()).create(); + + for (int i = 0; i < totalMsg; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + } + for (int i = totalMsg; i < totalMsg * 2; i++) { + String message = "my-message-" + i; + producer2.send(message.getBytes()); + } + + MessageImpl<byte[]> msg = null; + + msg = (MessageImpl<byte[]>) normalConsumer.receive(500, TimeUnit.MILLISECONDS); + // should not able to read message using normal message. + assertNull(msg); + + for (int i = 0; i < totalMsg * 2; i++) { + msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS); + // verify that encrypted message contains encryption-context + msg.getEncryptionCtx() + .orElseThrow(() -> new IllegalStateException("encryption-ctx not present for encrypted message")); + String receivedMessage = new String(msg.getData()); + log.debug("Received message: [{}]", receivedMessage); + String expectedMessage = "my-message-" + i; + testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); + } + // Acknowledge the consumption of all messages at once + consumer.acknowledgeCumulative(msg); + consumer.close(); + } + + protected <T> void testMessageOrderAndDuplicates(Set<T> messagesReceived, T receivedMessage, + T expectedMessage) { + // Make sure that messages are received in order + Assert.assertEquals(receivedMessage, expectedMessage, + "Received message " + receivedMessage + " did not match the expected message " + expectedMessage); + + // Make sure that there are no duplicates + Assert.assertTrue(messagesReceived.add(receivedMessage), "Received duplicate message " + receivedMessage); + } + + @Test + public void testRedeliveryOfFailedMessages() throws Exception { + + @Cleanup + PulsarClient pulsarClient = PulsarClient.builder() + .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl()) + .build(); + + final String encryptionKeyName = "client-rsa.pem"; + final String encryptionKeyVersion = "1.0"; + Map<String, String> metadata = Maps.newHashMap(); + metadata.put("version", encryptionKeyVersion); + class EncKeyReader implements CryptoKeyReader { + EncryptionKeyInfo keyInfo = new EncryptionKeyInfo(); + + @Override + public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) { + String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName; + if (Files.isReadable(Paths.get(CERT_FILE_PATH))) { + try { + keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH))); + keyInfo.setMetadata(metadata); + return keyInfo; + } catch (IOException e) { + Assert.fail("Failed to read certificate from " + CERT_FILE_PATH); + } + } else { + Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable."); + } + return null; + } + + @Override + public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) { + String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName; + if (Files.isReadable(Paths.get(CERT_FILE_PATH))) { + try { + keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH))); + keyInfo.setMetadata(metadata); + return keyInfo; + } catch (IOException e) { + Assert.fail("Failed to read certificate from " + CERT_FILE_PATH); + } + } else { + Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable."); + } + return null; + } + } + + class InvalidKeyReader implements CryptoKeyReader { + EncryptionKeyInfo keyInfo = new EncryptionKeyInfo(); + + @Override + public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) { + return null; + } + + @Override + public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> metadata) { + return null; + } + } + + /* + * Redelivery functionality guarantees that customer will get a chance to process the message again. + * In case of shared subscription eventually every client will get a chance to process the message, till one of them acks it. + * + * For client with Encryption enabled where in cases like a new production rollout or a buggy client configuration, we might have a mismatch of consumers + * - few which can decrypt, few which can't (due to errors or cryptoReader not configured). + * + * In that case eventually all messages should be acked as long as there is a single consumer who can decrypt the message. + * + * Consumer 1 - Can decrypt message + * Consumer 2 - Has invalid Reader configured. + * Consumer 3 - Has no reader configured. + * + */ + + String topicName = "persistent://my-property/my-ns/myrsa-topic2"; + + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4) + .cryptoKeyReader(new EncKeyReader()).create(); + + PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection + Consumer<byte[]> consumer1 = newPulsarClient.newConsumer().topicsPattern(topicName) + .subscriptionName("my-subscriber-name").cryptoKeyReader(new EncKeyReader()) + .subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe(); + + PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection + Consumer<byte[]> consumer2 = newPulsarClient1.newConsumer().topicsPattern(topicName) + .subscriptionName("my-subscriber-name").cryptoKeyReader(new InvalidKeyReader()) + .subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe(); + + PulsarClient newPulsarClient2 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection + Consumer<byte[]> consumer3 = newPulsarClient2.newConsumer().topicsPattern(topicName) + .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe(); + + int numberOfMessages = 100; + String message = "my-message"; + Set<String> messages = new HashSet(); // Since messages are in random order + for (int i = 0; i<numberOfMessages; i++) { Review comment: ```suggestion for (int i = 0; i < numberOfMessages; i++) { ``` ########## File path: tests/pulsar-client-admin-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java ########## @@ -0,0 +1,577 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration; + +import lombok.Cleanup; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.*; +import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.client.impl.TopicMessageImpl; +import org.apache.pulsar.client.impl.crypto.MessageCryptoBc; +import org.apache.pulsar.common.api.EncryptionContext; +import org.apache.pulsar.common.api.proto.PulsarApi; +import org.apache.pulsar.common.compression.CompressionCodec; +import org.apache.pulsar.common.compression.CompressionCodecProvider; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.shade.com.google.common.collect.Maps; +import org.apache.pulsar.shade.com.google.common.collect.Sets; +import org.apache.pulsar.shade.io.netty.buffer.ByteBuf; +import org.apache.pulsar.shade.io.netty.buffer.Unpooled; +import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.security.Security; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.testng.Assert.*; + +public class SimpleProducerConsumerTest { + private static final Logger log = LoggerFactory.getLogger(SimpleProducerConsumerTest.class); + + private PulsarContainer pulsarContainer; + private URI lookupUrl; + private PulsarClient pulsarClient; + + @BeforeClass + public void setup() throws PulsarClientException, URISyntaxException, PulsarAdminException { + Security.addProvider(new org.bouncycastle.jce.provider.BouncyCastleProvider()); + + pulsarContainer = new PulsarContainer(); + pulsarContainer.start(); + pulsarClient = PulsarClient.builder() + .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl()) + .build(); + lookupUrl = new URI(pulsarContainer.getPlainTextPulsarBrokerUrl()); + + PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarContainer.getPulsarAdminUrl()).build(); + admin.tenants().createTenant("my-property", + new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("standalone"))); + admin.namespaces().createNamespace("my-property/my-ns"); + admin.namespaces().setNamespaceReplicationClusters("my-property/my-ns", Sets.newHashSet("standalone")); + admin.close(); + } + + @AfterClass + public void cleanup() throws PulsarClientException { + pulsarClient.close(); + pulsarContainer.stop(); + pulsarContainer.close(); + } + + private PulsarClient newPulsarClient(String url, int intervalInSecs) throws PulsarClientException { + return PulsarClient.builder().serviceUrl(url).statsInterval(intervalInSecs, TimeUnit.SECONDS).build(); + } + + @Test + public void testRSAEncryption() throws Exception { + + String topicName = "persistent://my-property/my-ns/myrsa-topic1-"+ System.currentTimeMillis(); + + class EncKeyReader implements CryptoKeyReader { + + EncryptionKeyInfo keyInfo = new EncryptionKeyInfo(); + + @Override + public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) { + String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName; + if (Files.isReadable(Paths.get(CERT_FILE_PATH))) { + try { + keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH))); + return keyInfo; + } catch (IOException e) { + Assert.fail("Failed to read certificate from " + CERT_FILE_PATH); + } + } else { + Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable."); + } + return null; + } + + @Override + public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) { + String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName; + if (Files.isReadable(Paths.get(CERT_FILE_PATH))) { + try { + keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH))); + return keyInfo; + } catch (IOException e) { + Assert.fail("Failed to read certificate from " + CERT_FILE_PATH); + } + } else { + Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable."); + } + return null; + } + } + + final int totalMsg = 10; + + Set<String> messageSet = Sets.newHashSet(); + Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/myrsa-topic1") + .subscriptionName("my-subscriber-name").cryptoKeyReader(new EncKeyReader()).subscribe(); + Consumer<byte[]> normalConsumer = pulsarClient.newConsumer() + .topic(topicName).subscriptionName("my-subscriber-name-normal") + .subscribe(); + + Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1") + .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new EncKeyReader()).create(); + Producer<byte[]> producer2 = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1") + .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new EncKeyReader()).create(); + + for (int i = 0; i < totalMsg; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + } + for (int i = totalMsg; i < totalMsg * 2; i++) { + String message = "my-message-" + i; + producer2.send(message.getBytes()); + } + + MessageImpl<byte[]> msg = null; + + msg = (MessageImpl<byte[]>) normalConsumer.receive(500, TimeUnit.MILLISECONDS); + // should not able to read message using normal message. + assertNull(msg); + + for (int i = 0; i < totalMsg * 2; i++) { + msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS); + // verify that encrypted message contains encryption-context + msg.getEncryptionCtx() + .orElseThrow(() -> new IllegalStateException("encryption-ctx not present for encrypted message")); + String receivedMessage = new String(msg.getData()); + log.debug("Received message: [{}]", receivedMessage); + String expectedMessage = "my-message-" + i; + testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); + } + // Acknowledge the consumption of all messages at once + consumer.acknowledgeCumulative(msg); + consumer.close(); + } + + protected <T> void testMessageOrderAndDuplicates(Set<T> messagesReceived, T receivedMessage, + T expectedMessage) { + // Make sure that messages are received in order + Assert.assertEquals(receivedMessage, expectedMessage, + "Received message " + receivedMessage + " did not match the expected message " + expectedMessage); + + // Make sure that there are no duplicates + Assert.assertTrue(messagesReceived.add(receivedMessage), "Received duplicate message " + receivedMessage); + } + + @Test + public void testRedeliveryOfFailedMessages() throws Exception { + + @Cleanup + PulsarClient pulsarClient = PulsarClient.builder() + .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl()) + .build(); + + final String encryptionKeyName = "client-rsa.pem"; + final String encryptionKeyVersion = "1.0"; + Map<String, String> metadata = Maps.newHashMap(); + metadata.put("version", encryptionKeyVersion); + class EncKeyReader implements CryptoKeyReader { + EncryptionKeyInfo keyInfo = new EncryptionKeyInfo(); + + @Override + public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) { + String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName; + if (Files.isReadable(Paths.get(CERT_FILE_PATH))) { + try { + keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH))); + keyInfo.setMetadata(metadata); + return keyInfo; + } catch (IOException e) { + Assert.fail("Failed to read certificate from " + CERT_FILE_PATH); + } + } else { + Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable."); + } + return null; + } + + @Override + public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) { + String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName; + if (Files.isReadable(Paths.get(CERT_FILE_PATH))) { + try { + keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH))); + keyInfo.setMetadata(metadata); + return keyInfo; + } catch (IOException e) { + Assert.fail("Failed to read certificate from " + CERT_FILE_PATH); + } + } else { + Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable."); + } + return null; + } + } + + class InvalidKeyReader implements CryptoKeyReader { + EncryptionKeyInfo keyInfo = new EncryptionKeyInfo(); + + @Override + public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) { + return null; + } + + @Override + public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> metadata) { + return null; + } + } + + /* + * Redelivery functionality guarantees that customer will get a chance to process the message again. + * In case of shared subscription eventually every client will get a chance to process the message, till one of them acks it. + * + * For client with Encryption enabled where in cases like a new production rollout or a buggy client configuration, we might have a mismatch of consumers + * - few which can decrypt, few which can't (due to errors or cryptoReader not configured). + * + * In that case eventually all messages should be acked as long as there is a single consumer who can decrypt the message. + * + * Consumer 1 - Can decrypt message + * Consumer 2 - Has invalid Reader configured. + * Consumer 3 - Has no reader configured. + * + */ + + String topicName = "persistent://my-property/my-ns/myrsa-topic2"; + + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4) + .cryptoKeyReader(new EncKeyReader()).create(); + + PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection + Consumer<byte[]> consumer1 = newPulsarClient.newConsumer().topicsPattern(topicName) + .subscriptionName("my-subscriber-name").cryptoKeyReader(new EncKeyReader()) + .subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe(); + + PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection + Consumer<byte[]> consumer2 = newPulsarClient1.newConsumer().topicsPattern(topicName) + .subscriptionName("my-subscriber-name").cryptoKeyReader(new InvalidKeyReader()) + .subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe(); + + PulsarClient newPulsarClient2 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection + Consumer<byte[]> consumer3 = newPulsarClient2.newConsumer().topicsPattern(topicName) + .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe(); + + int numberOfMessages = 100; + String message = "my-message"; + Set<String> messages = new HashSet(); // Since messages are in random order + for (int i = 0; i<numberOfMessages; i++) { + producer.send((message + i).getBytes()); + } + + // Consuming from consumer 2 and 3 + // no message should be returned since they can't decrypt the message + Message m = consumer2.receive(3, TimeUnit.SECONDS); + assertNull(m); + m = consumer3.receive(3, TimeUnit.SECONDS); + assertNull(m); + + for (int i = 0; i<numberOfMessages; i++) { Review comment: ```suggestion for (int i = 0; i < numberOfMessages; i++) { ``` ########## File path: tests/pulsar-client-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java ########## @@ -0,0 +1,576 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration; + +import lombok.Cleanup; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.*; +import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.client.impl.TopicMessageImpl; +import org.apache.pulsar.client.impl.crypto.MessageCryptoBc; +import org.apache.pulsar.common.api.EncryptionContext; +import org.apache.pulsar.common.api.proto.PulsarApi; +import org.apache.pulsar.common.compression.CompressionCodec; +import org.apache.pulsar.common.compression.CompressionCodecProvider; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.shade.com.google.common.collect.Maps; +import org.apache.pulsar.shade.com.google.common.collect.Sets; +import org.apache.pulsar.shade.io.netty.buffer.ByteBuf; +import org.apache.pulsar.shade.io.netty.buffer.Unpooled; +import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.security.Security; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.testng.Assert.*; + +public class SimpleProducerConsumerTest { + private static final Logger log = LoggerFactory.getLogger(SimpleProducerConsumerTest.class); + + private PulsarContainer pulsarContainer; + private URI lookupUrl; + private PulsarClient pulsarClient; + + @BeforeClass + public void setup() throws PulsarClientException, URISyntaxException, PulsarAdminException { + Security.addProvider(new org.bouncycastle.jce.provider.BouncyCastleProvider()); + pulsarContainer = new PulsarContainer(); + pulsarContainer.start(); + pulsarClient = PulsarClient.builder() + .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl()) + .build(); + lookupUrl = new URI(pulsarContainer.getPlainTextPulsarBrokerUrl()); + + PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarContainer.getPulsarAdminUrl()).build(); + admin.tenants().createTenant("my-property", + new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("standalone"))); + admin.namespaces().createNamespace("my-property/my-ns"); + admin.namespaces().setNamespaceReplicationClusters("my-property/my-ns", Sets.newHashSet("standalone")); + admin.close(); + } + + @AfterClass + public void cleanup() throws PulsarClientException { + pulsarClient.close(); + pulsarContainer.stop(); + pulsarContainer.close(); + } + + private PulsarClient newPulsarClient(String url, int intervalInSecs) throws PulsarClientException { + return PulsarClient.builder().serviceUrl(url).statsInterval(intervalInSecs, TimeUnit.SECONDS).build(); + } + + @Test + public void testRSAEncryption() throws Exception { + + String topicName = "persistent://my-property/my-ns/myrsa-topic1-"+ System.currentTimeMillis(); + + class EncKeyReader implements CryptoKeyReader { + + EncryptionKeyInfo keyInfo = new EncryptionKeyInfo(); + + @Override + public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) { + String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName; + if (Files.isReadable(Paths.get(CERT_FILE_PATH))) { + try { + keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH))); + return keyInfo; + } catch (IOException e) { + Assert.fail("Failed to read certificate from " + CERT_FILE_PATH); + } + } else { + Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable."); + } + return null; + } + + @Override + public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) { + String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName; + if (Files.isReadable(Paths.get(CERT_FILE_PATH))) { + try { + keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH))); + return keyInfo; + } catch (IOException e) { + Assert.fail("Failed to read certificate from " + CERT_FILE_PATH); + } + } else { + Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable."); + } + return null; + } + } + + final int totalMsg = 10; + + Set<String> messageSet = Sets.newHashSet(); + Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/myrsa-topic1") + .subscriptionName("my-subscriber-name").cryptoKeyReader(new EncKeyReader()).subscribe(); + Consumer<byte[]> normalConsumer = pulsarClient.newConsumer() + .topic(topicName).subscriptionName("my-subscriber-name-normal") + .subscribe(); + + Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1") + .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new EncKeyReader()).create(); + Producer<byte[]> producer2 = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1") + .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new EncKeyReader()).create(); + + for (int i = 0; i < totalMsg; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + } + for (int i = totalMsg; i < totalMsg * 2; i++) { + String message = "my-message-" + i; + producer2.send(message.getBytes()); + } + + MessageImpl<byte[]> msg = null; + + msg = (MessageImpl<byte[]>) normalConsumer.receive(500, TimeUnit.MILLISECONDS); + // should not able to read message using normal message. + assertNull(msg); + + for (int i = 0; i < totalMsg * 2; i++) { + msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS); + // verify that encrypted message contains encryption-context + msg.getEncryptionCtx() + .orElseThrow(() -> new IllegalStateException("encryption-ctx not present for encrypted message")); + String receivedMessage = new String(msg.getData()); + log.debug("Received message: [{}]", receivedMessage); + String expectedMessage = "my-message-" + i; + testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); + } + // Acknowledge the consumption of all messages at once + consumer.acknowledgeCumulative(msg); + consumer.close(); + } + + protected <T> void testMessageOrderAndDuplicates(Set<T> messagesReceived, T receivedMessage, + T expectedMessage) { + // Make sure that messages are received in order + Assert.assertEquals(receivedMessage, expectedMessage, + "Received message " + receivedMessage + " did not match the expected message " + expectedMessage); + + // Make sure that there are no duplicates + Assert.assertTrue(messagesReceived.add(receivedMessage), "Received duplicate message " + receivedMessage); + } + + @Test + public void testRedeliveryOfFailedMessages() throws Exception { + + @Cleanup + PulsarClient pulsarClient = PulsarClient.builder() + .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl()) + .build(); + + final String encryptionKeyName = "client-rsa.pem"; + final String encryptionKeyVersion = "1.0"; + Map<String, String> metadata = Maps.newHashMap(); + metadata.put("version", encryptionKeyVersion); + class EncKeyReader implements CryptoKeyReader { + EncryptionKeyInfo keyInfo = new EncryptionKeyInfo(); + + @Override + public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) { + String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName; + if (Files.isReadable(Paths.get(CERT_FILE_PATH))) { + try { + keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH))); + keyInfo.setMetadata(metadata); + return keyInfo; + } catch (IOException e) { + Assert.fail("Failed to read certificate from " + CERT_FILE_PATH); + } + } else { + Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable."); + } + return null; + } + + @Override + public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) { + String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName; + if (Files.isReadable(Paths.get(CERT_FILE_PATH))) { + try { + keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH))); + keyInfo.setMetadata(metadata); + return keyInfo; + } catch (IOException e) { + Assert.fail("Failed to read certificate from " + CERT_FILE_PATH); + } + } else { + Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable."); + } + return null; + } + } + + class InvalidKeyReader implements CryptoKeyReader { + EncryptionKeyInfo keyInfo = new EncryptionKeyInfo(); + + @Override + public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) { + return null; + } + + @Override + public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> metadata) { + return null; + } + } + + /* + * Redelivery functionality guarantees that customer will get a chance to process the message again. + * In case of shared subscription eventually every client will get a chance to process the message, till one of them acks it. + * + * For client with Encryption enabled where in cases like a new production rollout or a buggy client configuration, we might have a mismatch of consumers + * - few which can decrypt, few which can't (due to errors or cryptoReader not configured). + * + * In that case eventually all messages should be acked as long as there is a single consumer who can decrypt the message. + * + * Consumer 1 - Can decrypt message + * Consumer 2 - Has invalid Reader configured. + * Consumer 3 - Has no reader configured. + * + */ + + String topicName = "persistent://my-property/my-ns/myrsa-topic2"; + + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4) + .cryptoKeyReader(new EncKeyReader()).create(); + + PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection + Consumer<byte[]> consumer1 = newPulsarClient.newConsumer().topicsPattern(topicName) + .subscriptionName("my-subscriber-name").cryptoKeyReader(new EncKeyReader()) + .subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe(); + + PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection + Consumer<byte[]> consumer2 = newPulsarClient1.newConsumer().topicsPattern(topicName) + .subscriptionName("my-subscriber-name").cryptoKeyReader(new InvalidKeyReader()) + .subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe(); + + PulsarClient newPulsarClient2 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection + Consumer<byte[]> consumer3 = newPulsarClient2.newConsumer().topicsPattern(topicName) + .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe(); + + int numberOfMessages = 100; + String message = "my-message"; + Set<String> messages = new HashSet(); // Since messages are in random order + for (int i = 0; i<numberOfMessages; i++) { Review comment: ```suggestion for (int i = 0; i < numberOfMessages; i++) { ``` ########## File path: tests/pulsar-client-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java ########## @@ -0,0 +1,576 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration; + +import lombok.Cleanup; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.*; +import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.client.impl.TopicMessageImpl; +import org.apache.pulsar.client.impl.crypto.MessageCryptoBc; +import org.apache.pulsar.common.api.EncryptionContext; +import org.apache.pulsar.common.api.proto.PulsarApi; +import org.apache.pulsar.common.compression.CompressionCodec; +import org.apache.pulsar.common.compression.CompressionCodecProvider; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.shade.com.google.common.collect.Maps; +import org.apache.pulsar.shade.com.google.common.collect.Sets; +import org.apache.pulsar.shade.io.netty.buffer.ByteBuf; +import org.apache.pulsar.shade.io.netty.buffer.Unpooled; +import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.security.Security; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.testng.Assert.*; + +public class SimpleProducerConsumerTest { + private static final Logger log = LoggerFactory.getLogger(SimpleProducerConsumerTest.class); + + private PulsarContainer pulsarContainer; + private URI lookupUrl; + private PulsarClient pulsarClient; + + @BeforeClass + public void setup() throws PulsarClientException, URISyntaxException, PulsarAdminException { + Security.addProvider(new org.bouncycastle.jce.provider.BouncyCastleProvider()); + pulsarContainer = new PulsarContainer(); + pulsarContainer.start(); + pulsarClient = PulsarClient.builder() + .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl()) + .build(); + lookupUrl = new URI(pulsarContainer.getPlainTextPulsarBrokerUrl()); + + PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarContainer.getPulsarAdminUrl()).build(); + admin.tenants().createTenant("my-property", + new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("standalone"))); + admin.namespaces().createNamespace("my-property/my-ns"); + admin.namespaces().setNamespaceReplicationClusters("my-property/my-ns", Sets.newHashSet("standalone")); + admin.close(); + } + + @AfterClass + public void cleanup() throws PulsarClientException { + pulsarClient.close(); + pulsarContainer.stop(); + pulsarContainer.close(); + } + + private PulsarClient newPulsarClient(String url, int intervalInSecs) throws PulsarClientException { + return PulsarClient.builder().serviceUrl(url).statsInterval(intervalInSecs, TimeUnit.SECONDS).build(); + } + + @Test + public void testRSAEncryption() throws Exception { + + String topicName = "persistent://my-property/my-ns/myrsa-topic1-"+ System.currentTimeMillis(); + + class EncKeyReader implements CryptoKeyReader { + + EncryptionKeyInfo keyInfo = new EncryptionKeyInfo(); + + @Override + public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) { + String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName; + if (Files.isReadable(Paths.get(CERT_FILE_PATH))) { + try { + keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH))); + return keyInfo; + } catch (IOException e) { + Assert.fail("Failed to read certificate from " + CERT_FILE_PATH); + } + } else { + Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable."); + } + return null; + } + + @Override + public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) { + String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName; + if (Files.isReadable(Paths.get(CERT_FILE_PATH))) { + try { + keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH))); + return keyInfo; + } catch (IOException e) { + Assert.fail("Failed to read certificate from " + CERT_FILE_PATH); + } + } else { + Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable."); + } + return null; + } + } + + final int totalMsg = 10; + + Set<String> messageSet = Sets.newHashSet(); + Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/myrsa-topic1") + .subscriptionName("my-subscriber-name").cryptoKeyReader(new EncKeyReader()).subscribe(); + Consumer<byte[]> normalConsumer = pulsarClient.newConsumer() + .topic(topicName).subscriptionName("my-subscriber-name-normal") + .subscribe(); + + Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1") + .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new EncKeyReader()).create(); + Producer<byte[]> producer2 = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1") + .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new EncKeyReader()).create(); + + for (int i = 0; i < totalMsg; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + } + for (int i = totalMsg; i < totalMsg * 2; i++) { + String message = "my-message-" + i; + producer2.send(message.getBytes()); + } + + MessageImpl<byte[]> msg = null; + + msg = (MessageImpl<byte[]>) normalConsumer.receive(500, TimeUnit.MILLISECONDS); + // should not able to read message using normal message. + assertNull(msg); + + for (int i = 0; i < totalMsg * 2; i++) { + msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS); + // verify that encrypted message contains encryption-context + msg.getEncryptionCtx() + .orElseThrow(() -> new IllegalStateException("encryption-ctx not present for encrypted message")); + String receivedMessage = new String(msg.getData()); + log.debug("Received message: [{}]", receivedMessage); + String expectedMessage = "my-message-" + i; + testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); + } + // Acknowledge the consumption of all messages at once + consumer.acknowledgeCumulative(msg); + consumer.close(); + } + + protected <T> void testMessageOrderAndDuplicates(Set<T> messagesReceived, T receivedMessage, + T expectedMessage) { + // Make sure that messages are received in order + Assert.assertEquals(receivedMessage, expectedMessage, + "Received message " + receivedMessage + " did not match the expected message " + expectedMessage); + + // Make sure that there are no duplicates + Assert.assertTrue(messagesReceived.add(receivedMessage), "Received duplicate message " + receivedMessage); + } + + @Test + public void testRedeliveryOfFailedMessages() throws Exception { + + @Cleanup + PulsarClient pulsarClient = PulsarClient.builder() + .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl()) + .build(); + + final String encryptionKeyName = "client-rsa.pem"; + final String encryptionKeyVersion = "1.0"; + Map<String, String> metadata = Maps.newHashMap(); + metadata.put("version", encryptionKeyVersion); + class EncKeyReader implements CryptoKeyReader { + EncryptionKeyInfo keyInfo = new EncryptionKeyInfo(); + + @Override + public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) { + String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName; + if (Files.isReadable(Paths.get(CERT_FILE_PATH))) { + try { + keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH))); + keyInfo.setMetadata(metadata); + return keyInfo; + } catch (IOException e) { + Assert.fail("Failed to read certificate from " + CERT_FILE_PATH); + } + } else { + Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable."); + } + return null; + } + + @Override + public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) { + String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName; + if (Files.isReadable(Paths.get(CERT_FILE_PATH))) { + try { + keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH))); + keyInfo.setMetadata(metadata); + return keyInfo; + } catch (IOException e) { + Assert.fail("Failed to read certificate from " + CERT_FILE_PATH); + } + } else { + Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable."); + } + return null; + } + } + + class InvalidKeyReader implements CryptoKeyReader { + EncryptionKeyInfo keyInfo = new EncryptionKeyInfo(); + + @Override + public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) { + return null; + } + + @Override + public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> metadata) { + return null; + } + } + + /* + * Redelivery functionality guarantees that customer will get a chance to process the message again. + * In case of shared subscription eventually every client will get a chance to process the message, till one of them acks it. + * + * For client with Encryption enabled where in cases like a new production rollout or a buggy client configuration, we might have a mismatch of consumers + * - few which can decrypt, few which can't (due to errors or cryptoReader not configured). + * + * In that case eventually all messages should be acked as long as there is a single consumer who can decrypt the message. + * + * Consumer 1 - Can decrypt message + * Consumer 2 - Has invalid Reader configured. + * Consumer 3 - Has no reader configured. + * + */ + + String topicName = "persistent://my-property/my-ns/myrsa-topic2"; + + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4) + .cryptoKeyReader(new EncKeyReader()).create(); + + PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection + Consumer<byte[]> consumer1 = newPulsarClient.newConsumer().topicsPattern(topicName) + .subscriptionName("my-subscriber-name").cryptoKeyReader(new EncKeyReader()) + .subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe(); + + PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection + Consumer<byte[]> consumer2 = newPulsarClient1.newConsumer().topicsPattern(topicName) + .subscriptionName("my-subscriber-name").cryptoKeyReader(new InvalidKeyReader()) + .subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe(); + + PulsarClient newPulsarClient2 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection + Consumer<byte[]> consumer3 = newPulsarClient2.newConsumer().topicsPattern(topicName) + .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe(); + + int numberOfMessages = 100; + String message = "my-message"; + Set<String> messages = new HashSet(); // Since messages are in random order + for (int i = 0; i<numberOfMessages; i++) { + producer.send((message + i).getBytes()); + } + + // Consuming from consumer 2 and 3 + // no message should be returned since they can't decrypt the message + Message m = consumer2.receive(3, TimeUnit.SECONDS); + assertNull(m); + m = consumer3.receive(3, TimeUnit.SECONDS); + assertNull(m); + + for (int i = 0; i<numberOfMessages; i++) { + // All messages would be received by consumer 1 + m = consumer1.receive(); + messages.add(new String(m.getData())); + consumer1.acknowledge(m); + } + + // Consuming from consumer 2 and 3 again just to be sure + // no message should be returned since they can't decrypt the message + m = consumer2.receive(3, TimeUnit.SECONDS); + assertNull(m); + m = consumer3.receive(3, TimeUnit.SECONDS); + assertNull(m); + + // checking if all messages were received + for (int i = 0; i<numberOfMessages; i++) { Review comment: ```suggestion for (int i = 0; i < numberOfMessages; i++) { ``` ########## File path: tests/pulsar-client-admin-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java ########## @@ -0,0 +1,577 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration; + +import lombok.Cleanup; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.*; +import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.client.impl.TopicMessageImpl; +import org.apache.pulsar.client.impl.crypto.MessageCryptoBc; +import org.apache.pulsar.common.api.EncryptionContext; +import org.apache.pulsar.common.api.proto.PulsarApi; +import org.apache.pulsar.common.compression.CompressionCodec; +import org.apache.pulsar.common.compression.CompressionCodecProvider; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.shade.com.google.common.collect.Maps; +import org.apache.pulsar.shade.com.google.common.collect.Sets; +import org.apache.pulsar.shade.io.netty.buffer.ByteBuf; +import org.apache.pulsar.shade.io.netty.buffer.Unpooled; +import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.security.Security; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.testng.Assert.*; + +public class SimpleProducerConsumerTest { + private static final Logger log = LoggerFactory.getLogger(SimpleProducerConsumerTest.class); + + private PulsarContainer pulsarContainer; + private URI lookupUrl; + private PulsarClient pulsarClient; + + @BeforeClass + public void setup() throws PulsarClientException, URISyntaxException, PulsarAdminException { + Security.addProvider(new org.bouncycastle.jce.provider.BouncyCastleProvider()); + + pulsarContainer = new PulsarContainer(); + pulsarContainer.start(); + pulsarClient = PulsarClient.builder() + .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl()) + .build(); + lookupUrl = new URI(pulsarContainer.getPlainTextPulsarBrokerUrl()); + + PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarContainer.getPulsarAdminUrl()).build(); + admin.tenants().createTenant("my-property", + new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("standalone"))); + admin.namespaces().createNamespace("my-property/my-ns"); + admin.namespaces().setNamespaceReplicationClusters("my-property/my-ns", Sets.newHashSet("standalone")); + admin.close(); + } + + @AfterClass + public void cleanup() throws PulsarClientException { + pulsarClient.close(); + pulsarContainer.stop(); + pulsarContainer.close(); + } + + private PulsarClient newPulsarClient(String url, int intervalInSecs) throws PulsarClientException { + return PulsarClient.builder().serviceUrl(url).statsInterval(intervalInSecs, TimeUnit.SECONDS).build(); + } + + @Test + public void testRSAEncryption() throws Exception { + + String topicName = "persistent://my-property/my-ns/myrsa-topic1-"+ System.currentTimeMillis(); + + class EncKeyReader implements CryptoKeyReader { + + EncryptionKeyInfo keyInfo = new EncryptionKeyInfo(); + + @Override + public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) { + String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName; + if (Files.isReadable(Paths.get(CERT_FILE_PATH))) { + try { + keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH))); + return keyInfo; + } catch (IOException e) { + Assert.fail("Failed to read certificate from " + CERT_FILE_PATH); + } + } else { + Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable."); + } + return null; + } + + @Override + public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) { + String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName; + if (Files.isReadable(Paths.get(CERT_FILE_PATH))) { + try { + keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH))); + return keyInfo; + } catch (IOException e) { + Assert.fail("Failed to read certificate from " + CERT_FILE_PATH); + } + } else { + Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable."); + } + return null; + } + } + + final int totalMsg = 10; + + Set<String> messageSet = Sets.newHashSet(); + Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/myrsa-topic1") + .subscriptionName("my-subscriber-name").cryptoKeyReader(new EncKeyReader()).subscribe(); + Consumer<byte[]> normalConsumer = pulsarClient.newConsumer() + .topic(topicName).subscriptionName("my-subscriber-name-normal") + .subscribe(); + + Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1") + .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new EncKeyReader()).create(); + Producer<byte[]> producer2 = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1") + .addEncryptionKey("client-rsa.pem").cryptoKeyReader(new EncKeyReader()).create(); + + for (int i = 0; i < totalMsg; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + } + for (int i = totalMsg; i < totalMsg * 2; i++) { + String message = "my-message-" + i; + producer2.send(message.getBytes()); + } + + MessageImpl<byte[]> msg = null; + + msg = (MessageImpl<byte[]>) normalConsumer.receive(500, TimeUnit.MILLISECONDS); + // should not able to read message using normal message. + assertNull(msg); + + for (int i = 0; i < totalMsg * 2; i++) { + msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS); + // verify that encrypted message contains encryption-context + msg.getEncryptionCtx() + .orElseThrow(() -> new IllegalStateException("encryption-ctx not present for encrypted message")); + String receivedMessage = new String(msg.getData()); + log.debug("Received message: [{}]", receivedMessage); + String expectedMessage = "my-message-" + i; + testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); + } + // Acknowledge the consumption of all messages at once + consumer.acknowledgeCumulative(msg); + consumer.close(); + } + + protected <T> void testMessageOrderAndDuplicates(Set<T> messagesReceived, T receivedMessage, + T expectedMessage) { + // Make sure that messages are received in order + Assert.assertEquals(receivedMessage, expectedMessage, + "Received message " + receivedMessage + " did not match the expected message " + expectedMessage); + + // Make sure that there are no duplicates + Assert.assertTrue(messagesReceived.add(receivedMessage), "Received duplicate message " + receivedMessage); + } + + @Test + public void testRedeliveryOfFailedMessages() throws Exception { + + @Cleanup + PulsarClient pulsarClient = PulsarClient.builder() + .serviceUrl(pulsarContainer.getPlainTextPulsarBrokerUrl()) + .build(); + + final String encryptionKeyName = "client-rsa.pem"; + final String encryptionKeyVersion = "1.0"; + Map<String, String> metadata = Maps.newHashMap(); + metadata.put("version", encryptionKeyVersion); + class EncKeyReader implements CryptoKeyReader { + EncryptionKeyInfo keyInfo = new EncryptionKeyInfo(); + + @Override + public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) { + String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName; + if (Files.isReadable(Paths.get(CERT_FILE_PATH))) { + try { + keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH))); + keyInfo.setMetadata(metadata); + return keyInfo; + } catch (IOException e) { + Assert.fail("Failed to read certificate from " + CERT_FILE_PATH); + } + } else { + Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable."); + } + return null; + } + + @Override + public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) { + String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName; + if (Files.isReadable(Paths.get(CERT_FILE_PATH))) { + try { + keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH))); + keyInfo.setMetadata(metadata); + return keyInfo; + } catch (IOException e) { + Assert.fail("Failed to read certificate from " + CERT_FILE_PATH); + } + } else { + Assert.fail("Certificate file " + CERT_FILE_PATH + " is not present or not readable."); + } + return null; + } + } + + class InvalidKeyReader implements CryptoKeyReader { + EncryptionKeyInfo keyInfo = new EncryptionKeyInfo(); + + @Override + public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) { + return null; + } + + @Override + public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> metadata) { + return null; + } + } + + /* + * Redelivery functionality guarantees that customer will get a chance to process the message again. + * In case of shared subscription eventually every client will get a chance to process the message, till one of them acks it. + * + * For client with Encryption enabled where in cases like a new production rollout or a buggy client configuration, we might have a mismatch of consumers + * - few which can decrypt, few which can't (due to errors or cryptoReader not configured). + * + * In that case eventually all messages should be acked as long as there is a single consumer who can decrypt the message. + * + * Consumer 1 - Can decrypt message + * Consumer 2 - Has invalid Reader configured. + * Consumer 3 - Has no reader configured. + * + */ + + String topicName = "persistent://my-property/my-ns/myrsa-topic2"; + + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4) + .cryptoKeyReader(new EncKeyReader()).create(); + + PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection + Consumer<byte[]> consumer1 = newPulsarClient.newConsumer().topicsPattern(topicName) + .subscriptionName("my-subscriber-name").cryptoKeyReader(new EncKeyReader()) + .subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe(); + + PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection + Consumer<byte[]> consumer2 = newPulsarClient1.newConsumer().topicsPattern(topicName) + .subscriptionName("my-subscriber-name").cryptoKeyReader(new InvalidKeyReader()) + .subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe(); + + PulsarClient newPulsarClient2 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection + Consumer<byte[]> consumer3 = newPulsarClient2.newConsumer().topicsPattern(topicName) + .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).ackTimeout(1, TimeUnit.SECONDS).subscribe(); + + int numberOfMessages = 100; + String message = "my-message"; + Set<String> messages = new HashSet(); // Since messages are in random order + for (int i = 0; i<numberOfMessages; i++) { + producer.send((message + i).getBytes()); + } + + // Consuming from consumer 2 and 3 + // no message should be returned since they can't decrypt the message + Message m = consumer2.receive(3, TimeUnit.SECONDS); + assertNull(m); + m = consumer3.receive(3, TimeUnit.SECONDS); + assertNull(m); + + for (int i = 0; i<numberOfMessages; i++) { + // All messages would be received by consumer 1 + m = consumer1.receive(); + messages.add(new String(m.getData())); + consumer1.acknowledge(m); + } + + // Consuming from consumer 2 and 3 again just to be sure + // no message should be returned since they can't decrypt the message + m = consumer2.receive(3, TimeUnit.SECONDS); + assertNull(m); + m = consumer3.receive(3, TimeUnit.SECONDS); + assertNull(m); + + // checking if all messages were received + for (int i = 0; i<numberOfMessages; i++) { Review comment: ```suggestion for (int i = 0; i < numberOfMessages; i++) { ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
