This is an automated email from the ASF dual-hosted git repository. yong pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ca94db5f90a3e686bed03af7689726ce12b1b10c Author: Masahiro Sakamoto <[email protected]> AuthorDate: Mon Feb 1 11:10:21 2021 +0900 Add default implementation of CryptoKeyReader (#9379) Currently, users must implement the `CryptoKeyReader` interface for end-to-end message encryption in Java. I thought it would be useful to have a default implementation, so I added `DefaultCryptoKeyReader`. How to use: ```java Producer<byte[]> producer = pulsarClient.newProducer() .topic("persistent://public/default/test") .addEncryptionKey("my-app-key1") .defaultCryptoKeyReader("file:///path/to/public.key") .create(); Consumer<byte[]> consumer = pulsarClient.newConsumer() .topic("persistent://public/default/test") .subscriptionName("sub1") .defaultCryptoKeyReader(new HashMap<String, String>() { { put("my-app-key1", "file:///path/to/private.key"); put("my-app-key2", "data:application/x-pem-file;base64,*****"); } }) .subscribe(); ``` (cherry picked from commit 85b1c7ef8a37e0bc06052f04ecd5da076804e8cb) --- .../client/api/SimpleProducerConsumerTest.java | 96 +++++++++++++++++ .../apache/pulsar/client/api/TopicReaderTest.java | 88 ++++++++++++++++ .../apache/pulsar/client/api/ConsumerBuilder.java | 24 +++++ .../apache/pulsar/client/api/ProducerBuilder.java | 24 +++++ .../apache/pulsar/client/api/ReaderBuilder.java | 24 +++++ .../pulsar/client/impl/ConsumerBuilderImpl.java | 13 +++ .../pulsar/client/impl/DefaultCryptoKeyReader.java | 107 +++++++++++++++++++ .../client/impl/DefaultCryptoKeyReaderBuilder.java | 77 ++++++++++++++ .../pulsar/client/impl/ProducerBuilderImpl.java | 13 +++ .../pulsar/client/impl/ReaderBuilderImpl.java | 15 +++ .../DefaultCryptoKeyReaderConfigurationData.java | 114 +++++++++++++++++++++ .../client/impl/ConsumerBuilderImplTest.java | 23 +++++ .../client/impl/DefaultCryptoKeyReaderTest.java | 113 ++++++++++++++++++++ .../client/impl/ProducerBuilderImplTest.java | 24 +++++ ...efaultCryptoKeyReaderConfigurationDataTest.java | 73 +++++++++++++ .../src/test/resources/crypto_ecdsa_private.key | 29 ++++++ .../src/test/resources/crypto_ecdsa_public.key | 15 +++ .../src/test/resources/crypto_rsa_private.key | 27 +++++ .../src/test/resources/crypto_rsa_public.key | 9 ++ 19 files changed, 908 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 214ec31..4db81d3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -2617,6 +2617,102 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { } @Test(groups = "encryption") + public void testDefaultCryptoKeyReader() throws Exception { + final String topic = "persistent://my-property/my-ns/default-crypto-key-reader" + System.currentTimeMillis(); + final String ecdsaPublicKeyFile = "file:./src/test/resources/certificate/public-key.client-ecdsa.pem"; + final String ecdsaPrivateKeyFile = "file:./src/test/resources/certificate/private-key.client-ecdsa.pem"; + final String ecdsaPublicKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlIS01JR2pCZ2NxaGtqT1BRSUJNSUdYQWdFQk1Cd0dCeXFHU000OUFRRUNFUUQvLy8vOS8vLy8vLy8vLy8vLwovLy8vTURzRUVQLy8vLzMvLy8vLy8vLy8vLy8vLy93RUVPaDFlY0VRZWZROTJDU1pQQ3p1WHRNREZRQUFEZzFOCmFXNW5hSFZoVVhVTXdEcEVjOUEyZVFRaEJCWWY5MUtMaVpzdERDaGdmS1VzVzRiUFdzZzVXNi9yRThBdG9wTGQKN1hxREFoRUEvLy8vL2dBQUFBQjFvdzBia0RpaEZRSUJBUU1pQUFUcktqNlJQSEdQTktjWktJT2NjTjR0Z0VOTQpuMWR6S2pMck1aVGtKNG9BYVE9PQotLS [...] + final String ecdsaPrivateKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBFQyBQQVJBTUVURVJTLS0tLS0KTUlHWEFnRUJNQndHQnlxR1NNNDlBUUVDRVFELy8vLzkvLy8vLy8vLy8vLy8vLy8vTURzRUVQLy8vLzMvLy8vLwovLy8vLy8vLy8vd0VFT2gxZWNFUWVmUTkyQ1NaUEN6dVh0TURGUUFBRGcxTmFXNW5hSFZoVVhVTXdEcEVjOUEyCmVRUWhCQllmOTFLTGlac3REQ2hnZktVc1c0YlBXc2c1VzYvckU4QXRvcExkN1hxREFoRUEvLy8vL2dBQUFBQjEKb3cwYmtEaWhGUUlCQVE9PQotLS0tLUVORCBFQyBQQVJBTUVURVJTLS0tLS0KLS0tLS1CRUdJTiBFQyBQUklWQVRFIEtFWS0tLS0tCk1JSFlBZ0VCQ [...] + final String rsaPublicKeyFile = "file:./src/test/resources/certificate/public-key.client-rsa.pem"; + final String rsaPrivateKeyFile = "file:./src/test/resources/certificate/private-key.client-rsa.pem"; + final String rsaPublicKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlJQklqQU5CZ2txaGtpRzl3MEJBUUVGQUFPQ0FROEFNSUlCQ2dLQ0FRRUF0S1d3Z3FkblRZck9DditqMU1rVApXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuYjEwSmNGZjVaanpQOUJTWEsrdEhtSTh1b04zNjh2RXY2eWhVClJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0M3cWRqQ043TERKM01ucWlCSXJVc1NhRVAxd3JOc0Ixa0krbzkKRVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVla0hxTDdzQmxKOThoNk5tc2ljRWFVa2FyZGswVE9YcmxrakMrYwpNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRu [...] + final String rsaPrivateKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlFb3dJQkFBS0NBUUVBdEtXd2dxZG5UWXJPQ3YrajFNa1RXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuCmIxMEpjRmY1Wmp6UDlCU1hLK3RIbUk4dW9OMzY4dkV2NnloVVJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0MKN3FkakNON0xESjNNbnFpQklyVXNTYUVQMXdyTnNCMWtJK285RVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVlawpIcUw3c0JsSjk4aDZObXNpY0VhVWthcmRrMFRPWHJsa2pDK2NNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRuMXZ0Cmhwdm5YTHZDbUc0TSs2eHRZdEQ [...] + final int numMsg = 10; + + Map<String, String> privateKeyFileMap = Maps.newHashMap(); + privateKeyFileMap.put("client-ecdsa.pem", ecdsaPrivateKeyFile); + privateKeyFileMap.put("client-rsa.pem", rsaPrivateKeyFile); + Map<String, String> privateKeyDataMap = Maps.newHashMap(); + privateKeyDataMap.put("client-ecdsa.pem", ecdsaPrivateKeyData); + privateKeyDataMap.put("client-rsa.pem", rsaPrivateKeyData); + + Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") + .defaultCryptoKeyReader(ecdsaPrivateKeyFile).subscribe(); + Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topic).subscriptionName("sub2") + .defaultCryptoKeyReader(ecdsaPrivateKeyData).subscribe(); + Consumer<byte[]> consumer3 = pulsarClient.newConsumer().topic(topic).subscriptionName("sub3") + .defaultCryptoKeyReader(privateKeyFileMap).subscribe(); + Consumer<byte[]> consumer4 = pulsarClient.newConsumer().topic(topic).subscriptionName("sub4") + .defaultCryptoKeyReader(privateKeyDataMap).subscribe(); + + Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topic).addEncryptionKey("client-ecdsa.pem") + .defaultCryptoKeyReader(ecdsaPublicKeyFile).create(); + Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topic).addEncryptionKey("client-ecdsa.pem") + .defaultCryptoKeyReader(ecdsaPublicKeyData).create(); + + for (int i = 0; i < numMsg; i++) { + producer1.send(("my-message-" + i).getBytes()); + } + for (int i = numMsg; i < numMsg * 2; i++) { + producer2.send(("my-message-" + i).getBytes()); + } + + producer1.close(); + producer2.close(); + + for (Consumer<byte[]> consumer : (List<Consumer<byte[]>>) Lists.newArrayList(consumer1, consumer2)) { + MessageImpl<byte[]> msg = null; + + for (int i = 0; i < numMsg * 2; i++) { + msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS); + // verify that encrypted message contains encryption-context + msg.getEncryptionCtx().orElseThrow( + () -> new IllegalStateException("encryption-ctx not present for encrypted message")); + assertEquals(new String(msg.getData()), "my-message-" + i); + } + + // Acknowledge the consumption of all messages at once + consumer.acknowledgeCumulative(msg); + } + + consumer1.unsubscribe(); + consumer2.unsubscribe(); + + Producer<byte[]> producer3 = pulsarClient.newProducer().topic(topic).addEncryptionKey("client-rsa.pem") + .defaultCryptoKeyReader(rsaPublicKeyFile).create(); + Producer<byte[]> producer4 = pulsarClient.newProducer().topic(topic).addEncryptionKey("client-rsa.pem") + .defaultCryptoKeyReader(rsaPublicKeyData).create(); + + for (int i = numMsg * 2; i < numMsg * 3; i++) { + producer3.send(("my-message-" + i).getBytes()); + } + for (int i = numMsg * 3; i < numMsg * 4; i++) { + producer4.send(("my-message-" + i).getBytes()); + } + + producer3.close(); + producer4.close(); + + for (Consumer<byte[]> consumer : (List<Consumer<byte[]>>) Lists.newArrayList(consumer3, consumer4)) { + MessageImpl<byte[]> msg = null; + + for (int i = 0; i < numMsg * 4; i++) { + msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS); + // verify that encrypted message contains encryption-context + msg.getEncryptionCtx().orElseThrow( + () -> new IllegalStateException("encryption-ctx not present for encrypted message")); + assertEquals(new String(msg.getData()), "my-message-" + i); + } + + // Acknowledge the consumption of all messages at once + consumer.acknowledgeCumulative(msg); + } + + consumer3.unsubscribe(); + consumer4.unsubscribe(); + } + + @Test(groups = "encryption") public void testRedeliveryOfFailedMessages() throws Exception { log.info("-- Starting {} test --", methodName); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java index 7a55d2f..8c8e83a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java @@ -25,6 +25,7 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import java.io.IOException; import java.nio.file.Files; @@ -645,6 +646,93 @@ public class TopicReaderTest extends ProducerConsumerBase { reader.close(); } + @Test(groups = "encryption") + public void testDefaultCryptoKeyReader() throws Exception { + final String topic = "persistent://my-property/my-ns/test-reader-default-crypto-key-reader" + + System.currentTimeMillis(); + final String ecdsaPublicKeyFile = "file:./src/test/resources/certificate/public-key.client-ecdsa.pem"; + final String ecdsaPrivateKeyFile = "file:./src/test/resources/certificate/private-key.client-ecdsa.pem"; + final String ecdsaPublicKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlIS01JR2pCZ2NxaGtqT1BRSUJNSUdYQWdFQk1Cd0dCeXFHU000OUFRRUNFUUQvLy8vOS8vLy8vLy8vLy8vLwovLy8vTURzRUVQLy8vLzMvLy8vLy8vLy8vLy8vLy93RUVPaDFlY0VRZWZROTJDU1pQQ3p1WHRNREZRQUFEZzFOCmFXNW5hSFZoVVhVTXdEcEVjOUEyZVFRaEJCWWY5MUtMaVpzdERDaGdmS1VzVzRiUFdzZzVXNi9yRThBdG9wTGQKN1hxREFoRUEvLy8vL2dBQUFBQjFvdzBia0RpaEZRSUJBUU1pQUFUcktqNlJQSEdQTktjWktJT2NjTjR0Z0VOTQpuMWR6S2pMck1aVGtKNG9BYVE9PQotLS [...] + final String ecdsaPrivateKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBFQyBQQVJBTUVURVJTLS0tLS0KTUlHWEFnRUJNQndHQnlxR1NNNDlBUUVDRVFELy8vLzkvLy8vLy8vLy8vLy8vLy8vTURzRUVQLy8vLzMvLy8vLwovLy8vLy8vLy8vd0VFT2gxZWNFUWVmUTkyQ1NaUEN6dVh0TURGUUFBRGcxTmFXNW5hSFZoVVhVTXdEcEVjOUEyCmVRUWhCQllmOTFLTGlac3REQ2hnZktVc1c0YlBXc2c1VzYvckU4QXRvcExkN1hxREFoRUEvLy8vL2dBQUFBQjEKb3cwYmtEaWhGUUlCQVE9PQotLS0tLUVORCBFQyBQQVJBTUVURVJTLS0tLS0KLS0tLS1CRUdJTiBFQyBQUklWQVRFIEtFWS0tLS0tCk1JSFlBZ0VCQ [...] + final String rsaPublicKeyFile = "file:./src/test/resources/certificate/public-key.client-rsa.pem"; + final String rsaPrivateKeyFile = "file:./src/test/resources/certificate/private-key.client-rsa.pem"; + final String rsaPublicKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlJQklqQU5CZ2txaGtpRzl3MEJBUUVGQUFPQ0FROEFNSUlCQ2dLQ0FRRUF0S1d3Z3FkblRZck9DditqMU1rVApXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuYjEwSmNGZjVaanpQOUJTWEsrdEhtSTh1b04zNjh2RXY2eWhVClJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0M3cWRqQ043TERKM01ucWlCSXJVc1NhRVAxd3JOc0Ixa0krbzkKRVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVla0hxTDdzQmxKOThoNk5tc2ljRWFVa2FyZGswVE9YcmxrakMrYwpNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRu [...] + final String rsaPrivateKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlFb3dJQkFBS0NBUUVBdEtXd2dxZG5UWXJPQ3YrajFNa1RXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuCmIxMEpjRmY1Wmp6UDlCU1hLK3RIbUk4dW9OMzY4dkV2NnloVVJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0MKN3FkakNON0xESjNNbnFpQklyVXNTYUVQMXdyTnNCMWtJK285RVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVlawpIcUw3c0JsSjk4aDZObXNpY0VhVWthcmRrMFRPWHJsa2pDK2NNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRuMXZ0Cmhwdm5YTHZDbUc0TSs2eHRZdEQ [...] + final int numMsg = 10; + + Map<String, String> privateKeyFileMap = Maps.newHashMap(); + privateKeyFileMap.put("client-ecdsa.pem", ecdsaPrivateKeyFile); + privateKeyFileMap.put("client-rsa.pem", rsaPrivateKeyFile); + Map<String, String> privateKeyDataMap = Maps.newHashMap(); + privateKeyDataMap.put("client-ecdsa.pem", ecdsaPrivateKeyData); + privateKeyDataMap.put("client-rsa.pem", rsaPrivateKeyData); + + Reader<byte[]> reader1 = pulsarClient.newReader().topic(topic).startMessageId(MessageId.latest) + .defaultCryptoKeyReader(ecdsaPrivateKeyFile).create(); + Reader<byte[]> reader2 = pulsarClient.newReader().topic(topic).startMessageId(MessageId.latest) + .defaultCryptoKeyReader(ecdsaPrivateKeyData).create(); + Reader<byte[]> reader3 = pulsarClient.newReader().topic(topic).startMessageId(MessageId.latest) + .defaultCryptoKeyReader(privateKeyFileMap).create(); + Reader<byte[]> reader4 = pulsarClient.newReader().topic(topic).startMessageId(MessageId.latest) + .defaultCryptoKeyReader(privateKeyDataMap).create(); + + Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topic).addEncryptionKey("client-ecdsa.pem") + .defaultCryptoKeyReader(ecdsaPublicKeyFile).create(); + Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topic).addEncryptionKey("client-ecdsa.pem") + .defaultCryptoKeyReader(ecdsaPublicKeyData).create(); + + for (int i = 0; i < numMsg; i++) { + producer1.send(("my-message-" + i).getBytes()); + } + for (int i = numMsg; i < numMsg * 2; i++) { + producer2.send(("my-message-" + i).getBytes()); + } + + producer1.close(); + producer2.close(); + + for (Reader<byte[]> reader : (List<Reader<byte[]>>) Lists.newArrayList(reader1, reader2)) { + for (int i = 0; i < numMsg * 2; i++) { + MessageImpl<byte[]> msg = (MessageImpl<byte[]>) reader.readNext(5, TimeUnit.SECONDS); + // verify that encrypted message contains encryption-context + msg.getEncryptionCtx().orElseThrow( + () -> new IllegalStateException("encryption-ctx not present for encrypted message")); + assertEquals(new String(msg.getData()), "my-message-" + i); + } + } + + reader1.close(); + reader2.close(); + + Producer<byte[]> producer3 = pulsarClient.newProducer().topic(topic).addEncryptionKey("client-rsa.pem") + .defaultCryptoKeyReader(rsaPublicKeyFile).create(); + Producer<byte[]> producer4 = pulsarClient.newProducer().topic(topic).addEncryptionKey("client-rsa.pem") + .defaultCryptoKeyReader(rsaPublicKeyData).create(); + + for (int i = numMsg * 2; i < numMsg * 3; i++) { + producer3.send(("my-message-" + i).getBytes()); + } + for (int i = numMsg * 3; i < numMsg * 4; i++) { + producer4.send(("my-message-" + i).getBytes()); + } + + producer3.close(); + producer4.close(); + + for (Reader<byte[]> reader : (List<Reader<byte[]>>) Lists.newArrayList(reader3, reader4)) { + for (int i = 0; i < numMsg * 4; i++) { + MessageImpl<byte[]> msg = (MessageImpl<byte[]>) reader.readNext(5, TimeUnit.SECONDS); + // verify that encrypted message contains encryption-context + msg.getEncryptionCtx().orElseThrow( + () -> new IllegalStateException("encryption-ctx not present for encrypted message")); + assertEquals(new String(msg.getData()), "my-message-" + i); + } + } + + reader3.close(); + reader4.close(); + } + @Test public void testSimpleReaderReachEndOfTopic() throws Exception { Reader<byte[]> reader = pulsarClient.newReader().topic("persistent://my-property/my-ns/testSimpleReaderReachEndOfTopic") diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java index c6a6555..85a4ca5 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java @@ -271,6 +271,30 @@ public interface ConsumerBuilder<T> extends Cloneable { ConsumerBuilder<T> cryptoKeyReader(CryptoKeyReader cryptoKeyReader); /** + * Sets the default implementation of {@link CryptoKeyReader}. + * + * <p>Configure the key reader to be used to decrypt the message payloads. + * + * @param privateKey + * the private key that is always used to decrypt message payloads. + * @return the consumer builder instance + * @since 2.8.0 + */ + ConsumerBuilder<T> defaultCryptoKeyReader(String privateKey); + + /** + * Sets the default implementation of {@link CryptoKeyReader}. + * + * <p>Configure the key reader to be used to decrypt the message payloads. + * + * @param privateKeys + * the map of private key names and their URIs used to decrypt message payloads. + * @return the consumer builder instance + * @since 2.8.0 + */ + ConsumerBuilder<T> defaultCryptoKeyReader(Map<String, String> privateKeys); + + /** * Sets a {@link MessageCrypto}. * * <p>Contains methods to encrypt/decrypt message for End to End Encryption. diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java index 8c40343..62d9b20 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java @@ -335,6 +335,30 @@ public interface ProducerBuilder<T> extends Cloneable { ProducerBuilder<T> cryptoKeyReader(CryptoKeyReader cryptoKeyReader); /** + * Sets the default implementation of {@link CryptoKeyReader}. + * + * <p>Configure the key reader to be used to encrypt the message payloads. + * + * @param publicKey + * the public key that is always used to encrypt message payloads. + * @return the producer builder instance + * @since 2.8.0 + */ + ProducerBuilder<T> defaultCryptoKeyReader(String publicKey); + + /** + * Sets the default implementation of {@link CryptoKeyReader}. + * + * <p>Configure the key reader to be used to encrypt the message payloads. + * + * @param publicKeys + * the map of public key names and their URIs used to encrypt message payloads. + * @return the producer builder instance + * @since 2.8.0 + */ + ProducerBuilder<T> defaultCryptoKeyReader(Map<String, String> publicKeys); + + /** * Add public encryption key, used by producer to encrypt the data key. * * <p>At the time of producer creation, Pulsar client checks if there are keys added to encryptionKeys. If keys are diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java index 506ad86..4adf492 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java @@ -170,6 +170,30 @@ public interface ReaderBuilder<T> extends Cloneable { ReaderBuilder<T> cryptoKeyReader(CryptoKeyReader cryptoKeyReader); /** + * Sets the default implementation of {@link CryptoKeyReader}. + * + * <p>Configure the key reader to be used to decrypt the message payloads. + * + * @param privateKey + * the private key that is always used to decrypt message payloads. + * @return the reader builder instance + * @since 2.8.0 + */ + ReaderBuilder<T> defaultCryptoKeyReader(String privateKey); + + /** + * Sets the default implementation of {@link CryptoKeyReader}. + * + * <p>Configure the key reader to be used to decrypt the message payloads. + * + * @param privateKeys + * the map of private key names and their URIs used to decrypt message payloads. + * @return the reader builder instance + * @since 2.8.0 + */ + ReaderBuilder<T> defaultCryptoKeyReader(Map<String, String> privateKeys); + + /** * Sets the {@link ConsumerCryptoFailureAction} to specify. * * @param action diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index 8429598..629e751 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -50,6 +50,7 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.DefaultCryptoKeyReader; import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.util.RetryMessageUtil; @@ -240,6 +241,18 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> { } @Override + public ConsumerBuilder<T> defaultCryptoKeyReader(String privateKey) { + checkArgument(StringUtils.isNotBlank(privateKey), "privateKey cannot be blank"); + return cryptoKeyReader(DefaultCryptoKeyReader.builder().defaultPrivateKey(privateKey).build()); + } + + @Override + public ConsumerBuilder<T> defaultCryptoKeyReader(@NonNull Map<String, String> privateKeys) { + checkArgument(!privateKeys.isEmpty(), "privateKeys cannot be empty"); + return cryptoKeyReader(DefaultCryptoKeyReader.builder().privateKeys(privateKeys).build()); + } + + @Override public ConsumerBuilder<T> messageCrypto(@NonNull MessageCrypto messageCrypto) { conf.setMessageCrypto(messageCrypto); return this; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DefaultCryptoKeyReader.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DefaultCryptoKeyReader.java new file mode 100644 index 0000000..4e8f6c3 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DefaultCryptoKeyReader.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.net.URLConnection; +import java.util.Map; + +import org.apache.commons.io.IOUtils; +import org.apache.pulsar.client.api.CryptoKeyReader; +import org.apache.pulsar.client.api.EncryptionKeyInfo; +import org.apache.pulsar.client.api.url.URL; +import org.apache.pulsar.client.impl.conf.DefaultCryptoKeyReaderConfigurationData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DefaultCryptoKeyReader implements CryptoKeyReader { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultCryptoKeyReader.class); + + private static final String APPLICATION_X_PEM_FILE = "application/x-pem-file"; + + private String defaultPublicKey; + private String defaultPrivateKey; + + private Map<String, String> publicKeys; + private Map<String, String> privateKeys; + + public static DefaultCryptoKeyReaderBuilder builder() { + return new DefaultCryptoKeyReaderBuilder(); + } + + DefaultCryptoKeyReader(DefaultCryptoKeyReaderConfigurationData conf) { + this.defaultPublicKey = conf.getDefaultPublicKey(); + this.defaultPrivateKey = conf.getDefaultPrivateKey(); + this.publicKeys = conf.getPublicKeys(); + this.privateKeys = conf.getPrivateKeys(); + } + + @Override + public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> metadata) { + EncryptionKeyInfo keyInfo = new EncryptionKeyInfo(); + String publicKey = publicKeys.getOrDefault(keyName, defaultPublicKey); + + if (publicKey == null) { + LOG.warn("Public key named {} is not set", keyName); + } else { + try { + keyInfo.setKey(loadKey(publicKey)); + } catch (Exception e) { + LOG.error("Failed to load public key named {}", keyName, e); + } + } + + return keyInfo; + } + + @Override + public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> metadata) { + EncryptionKeyInfo keyInfo = new EncryptionKeyInfo(); + String privateKey = privateKeys.getOrDefault(keyName, defaultPrivateKey); + + if (privateKey == null) { + LOG.warn("Private key named {} is not set", keyName); + } else { + try { + keyInfo.setKey(loadKey(privateKey)); + } catch (Exception e) { + LOG.error("Failed to load private key named {}", keyName, e); + } + } + + return keyInfo; + } + + private byte[] loadKey(String keyUrl) throws IOException, IllegalAccessException, InstantiationException { + try { + URLConnection urlConnection = new URL(keyUrl).openConnection(); + String protocol = urlConnection.getURL().getProtocol(); + if ("data".equals(protocol) && !APPLICATION_X_PEM_FILE.equals(urlConnection.getContentType())) { + throw new IllegalArgumentException( + "Unsupported media type or encoding format: " + urlConnection.getContentType()); + } + return IOUtils.toByteArray(urlConnection); + } catch (URISyntaxException e) { + throw new IllegalArgumentException("Invalid key format"); + } + } + +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DefaultCryptoKeyReaderBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DefaultCryptoKeyReaderBuilder.java new file mode 100644 index 0000000..4d98a01 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DefaultCryptoKeyReaderBuilder.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.pulsar.client.impl.conf.DefaultCryptoKeyReaderConfigurationData; + +public class DefaultCryptoKeyReaderBuilder implements Cloneable { + + private DefaultCryptoKeyReaderConfigurationData conf; + + DefaultCryptoKeyReaderBuilder() { + this(new DefaultCryptoKeyReaderConfigurationData()); + } + + DefaultCryptoKeyReaderBuilder(DefaultCryptoKeyReaderConfigurationData conf) { + this.conf = conf; + } + + public DefaultCryptoKeyReaderBuilder defaultPublicKey(String defaultPublicKey) { + conf.setDefaultPublicKey(defaultPublicKey); + return this; + } + + public DefaultCryptoKeyReaderBuilder defaultPrivateKey(String defaultPrivateKey) { + conf.setDefaultPrivateKey(defaultPrivateKey); + return this; + } + + public DefaultCryptoKeyReaderBuilder publicKey(String keyName, String publicKey) { + conf.setPublicKey(keyName, publicKey); + return this; + } + + public DefaultCryptoKeyReaderBuilder privateKey(String keyName, String privateKey) { + conf.setPrivateKey(keyName, privateKey); + return this; + } + + public DefaultCryptoKeyReaderBuilder publicKeys(Map<String, String> publicKeys) { + conf.getPublicKeys().putAll(publicKeys); + return this; + } + + public DefaultCryptoKeyReaderBuilder privateKeys(Map<String, String> privateKeys) { + conf.getPrivateKeys().putAll(privateKeys); + return this; + } + + public DefaultCryptoKeyReader build() { + return new DefaultCryptoKeyReader(conf); + } + + @Override + public DefaultCryptoKeyReaderBuilder clone() { + return new DefaultCryptoKeyReaderBuilder(conf.clone()); + } + +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java index 64a22d7..2ae20c4 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java @@ -44,6 +44,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.interceptor.ProducerInterceptor; import org.apache.pulsar.client.api.interceptor.ProducerInterceptorWrapper; +import org.apache.pulsar.client.impl.DefaultCryptoKeyReader; import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import org.apache.pulsar.common.util.FutureUtil; @@ -202,6 +203,18 @@ public class ProducerBuilderImpl<T> implements ProducerBuilder<T> { } @Override + public ProducerBuilder<T> defaultCryptoKeyReader(String publicKey) { + checkArgument(StringUtils.isNotBlank(publicKey), "publicKey cannot be blank"); + return cryptoKeyReader(DefaultCryptoKeyReader.builder().defaultPublicKey(publicKey).build()); + } + + @Override + public ProducerBuilder<T> defaultCryptoKeyReader(@NonNull Map<String, String> publicKeys) { + checkArgument(!publicKeys.isEmpty(), "publicKeys cannot be empty"); + return cryptoKeyReader(DefaultCryptoKeyReader.builder().publicKeys(publicKeys).build()); + } + + @Override public ProducerBuilder<T> addEncryptionKey(String key) { checkArgument(StringUtils.isNotBlank(key), "Encryption key cannot be blank"); conf.getEncryptionKeys().add(key); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java index 1c62a47..e4f02b6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.impl; +import static com.google.common.base.Preconditions.checkArgument; import java.util.Arrays; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -26,6 +27,7 @@ import java.util.concurrent.TimeUnit; import lombok.AccessLevel; import lombok.Getter; import com.google.common.base.Preconditions; +import lombok.NonNull; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; import org.apache.pulsar.client.api.CryptoKeyReader; @@ -36,6 +38,7 @@ import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.ReaderBuilder; import org.apache.pulsar.client.api.ReaderListener; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.DefaultCryptoKeyReader; import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils; import org.apache.pulsar.client.impl.conf.ReaderConfigurationData; import org.apache.pulsar.common.util.FutureUtil; @@ -141,6 +144,18 @@ public class ReaderBuilderImpl<T> implements ReaderBuilder<T> { } @Override + public ReaderBuilder<T> defaultCryptoKeyReader(String privateKey) { + checkArgument(StringUtils.isNotBlank(privateKey), "privateKey cannot be blank"); + return cryptoKeyReader(DefaultCryptoKeyReader.builder().defaultPrivateKey(privateKey).build()); + } + + @Override + public ReaderBuilder<T> defaultCryptoKeyReader(@NonNull Map<String, String> privateKeys) { + checkArgument(!privateKeys.isEmpty(), "privateKeys cannot be empty"); + return cryptoKeyReader(DefaultCryptoKeyReader.builder().privateKeys(privateKeys).build()); + } + + @Override public ReaderBuilder<T> cryptoFailureAction(ConsumerCryptoFailureAction action) { conf.setCryptoFailureAction(action); return this; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/DefaultCryptoKeyReaderConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/DefaultCryptoKeyReaderConfigurationData.java new file mode 100644 index 0000000..9db9099 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/DefaultCryptoKeyReaderConfigurationData.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.conf; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.NonNull; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class DefaultCryptoKeyReaderConfigurationData implements Serializable, Cloneable { + + private static final long serialVersionUID = 1L; + + private static final String TO_STRING_FORMAT = "%s(defaultPublicKey=%s, defaultPrivateKey=%s, publicKeys=%s, privateKeys=%s)"; + + @NonNull + private String defaultPublicKey; + @NonNull + private String defaultPrivateKey; + + @NonNull + private Map<String, String> publicKeys = new HashMap<>(); + @NonNull + private Map<String, String> privateKeys = new HashMap<>(); + + public void setPublicKey(@NonNull String keyName, @NonNull String publicKey) { + publicKeys.put(keyName, publicKey); + } + + public void setPrivateKey(@NonNull String keyName, @NonNull String privateKey) { + privateKeys.put(keyName, privateKey); + } + + @Override + public DefaultCryptoKeyReaderConfigurationData clone() { + DefaultCryptoKeyReaderConfigurationData clone = new DefaultCryptoKeyReaderConfigurationData(); + + if (defaultPublicKey != null) { + clone.setDefaultPublicKey(defaultPublicKey); + } + + if (defaultPrivateKey != null) { + clone.setDefaultPrivateKey(defaultPrivateKey); + } + + if (publicKeys != null) { + clone.setPublicKeys(new HashMap<String, String>(publicKeys)); + } + + if (privateKeys != null) { + clone.setPrivateKeys(new HashMap<String, String>(privateKeys)); + } + + return clone; + } + + @Override + public String toString() { + return String.format(TO_STRING_FORMAT, getClass().getSimpleName(), maskKeyData(defaultPublicKey), + maskKeyData(defaultPrivateKey), maskKeyData(publicKeys), maskKeyData(privateKeys)); + } + + private static String maskKeyData(Map<String, String> keys) { + if (keys == null) { + return "null"; + } else { + StringBuilder keysStr = new StringBuilder(); + keysStr.append("{"); + + List<String> kvList = new ArrayList<>(); + keys.forEach((k, v) -> kvList.add(k + "=" + maskKeyData(v))); + keysStr.append(String.join(", ", kvList)); + + keysStr.append("}"); + return keysStr.toString(); + } + } + + private static String maskKeyData(String key) { + if (key == null) { + return "null"; + } else if (key.startsWith("data:")) { + return "data:*****"; + } else { + return key; + } + } + +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java index 02d77a7..b2ce170 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java @@ -121,6 +121,29 @@ public class ConsumerBuilderImplTest { .cryptoKeyReader(null); } + @Test(expectedExceptions = IllegalArgumentException.class) + public void testConsumerBuilderImplWhenDefaultCryptoKeyReaderIsNullString() { + consumerBuilderImpl.topic(TOPIC_NAME).subscriptionName("subscriptionName") + .defaultCryptoKeyReader((String) null); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testConsumerBuilderImplWhenDefaultCryptoKeyReaderIsEmptyString() { + consumerBuilderImpl.topic(TOPIC_NAME).subscriptionName("subscriptionName").defaultCryptoKeyReader(""); + } + + @Test(expectedExceptions = NullPointerException.class) + public void testConsumerBuilderImplWhenDefaultCryptoKeyReaderIsNullMap() { + consumerBuilderImpl.topic(TOPIC_NAME).subscriptionName("subscriptionName") + .defaultCryptoKeyReader((Map<String, String>) null); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testConsumerBuilderImplWhenDefaultCryptoKeyReaderIsEmptyMap() { + consumerBuilderImpl.topic(TOPIC_NAME).subscriptionName("subscriptionName") + .defaultCryptoKeyReader(new HashMap<String, String>()); + } + @Test(expectedExceptions = NullPointerException.class) public void testConsumerBuilderImplWhenCryptoFailureActionIsNull() { consumerBuilderImpl.topic(TOPIC_NAME) diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/DefaultCryptoKeyReaderTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/DefaultCryptoKeyReaderTest.java new file mode 100644 index 0000000..f1e4cce --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/DefaultCryptoKeyReaderTest.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; + +import java.lang.reflect.Field; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; + +import org.testng.annotations.Test; + +public class DefaultCryptoKeyReaderTest { + + @Test + public void testBuild() throws Exception { + Map<String, String> publicKeys = new HashMap<>(); + publicKeys.put("key1", "file:///path/to/public1.key"); + publicKeys.put("key2", "file:///path/to/public2.key"); + + Map<String, String> privateKeys = new HashMap<>(); + privateKeys.put("key3", "file:///path/to/private3.key"); + + DefaultCryptoKeyReader keyReader = DefaultCryptoKeyReader.builder() + .defaultPublicKey("file:///path/to/default-public.key") + .defaultPrivateKey("file:///path/to/default-private.key") + .publicKey("key4", "file:///path/to/public4.key").publicKeys(publicKeys) + .publicKey("key5", "file:///path/to/public5.key").privateKey("key6", "file:///path/to/private6.key") + .privateKeys(privateKeys).privateKey("key7", "file:///path/to/private7.key").build(); + + Field defaultPublicKeyField = keyReader.getClass().getDeclaredField("defaultPublicKey"); + defaultPublicKeyField.setAccessible(true); + Field defaultPrivateKeyField = keyReader.getClass().getDeclaredField("defaultPrivateKey"); + defaultPrivateKeyField.setAccessible(true); + Field publicKeysField = keyReader.getClass().getDeclaredField("publicKeys"); + publicKeysField.setAccessible(true); + Field privateKeysField = keyReader.getClass().getDeclaredField("privateKeys"); + privateKeysField.setAccessible(true); + + Map<String, String> expectedPublicKeys = new HashMap<>(); + expectedPublicKeys.put("key1", "file:///path/to/public1.key"); + expectedPublicKeys.put("key2", "file:///path/to/public2.key"); + expectedPublicKeys.put("key4", "file:///path/to/public4.key"); + expectedPublicKeys.put("key5", "file:///path/to/public5.key"); + + Map<String, String> expectedPrivateKeys = new HashMap<>(); + expectedPrivateKeys.put("key3", "file:///path/to/private3.key"); + expectedPrivateKeys.put("key6", "file:///path/to/private6.key"); + expectedPrivateKeys.put("key7", "file:///path/to/private7.key"); + + assertEquals((String) defaultPublicKeyField.get(keyReader), "file:///path/to/default-public.key"); + assertEquals((String) defaultPrivateKeyField.get(keyReader), "file:///path/to/default-private.key"); + assertEquals((Map<String, String>) publicKeysField.get(keyReader), expectedPublicKeys); + assertEquals((Map<String, String>) privateKeysField.get(keyReader), expectedPrivateKeys); + } + + @Test + public void testGetKeys() throws Exception { + final String ecdsaPublicKey = "./src/test/resources/crypto_ecdsa_public.key"; + final String ecdsaPrivateKey = "./src/test/resources/crypto_ecdsa_private.key"; + final String rsaPublicKey = "./src/test/resources/crypto_rsa_public.key"; + final String rsaPrivateKey = "./src/test/resources/crypto_rsa_private.key"; + + DefaultCryptoKeyReader keyReader1 = DefaultCryptoKeyReader.builder().build(); + assertNull(keyReader1.getPublicKey("key0", null).getKey()); + assertNull(keyReader1.getPrivateKey("key0", null).getKey()); + + DefaultCryptoKeyReader keyReader2 = DefaultCryptoKeyReader.builder().defaultPublicKey("file:" + ecdsaPublicKey) + .defaultPrivateKey("file:" + ecdsaPrivateKey) + .publicKey("key1", + "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlJQklqQU5CZ2txaGtpRzl3MEJBUUVGQUFPQ0FROEFNSUlCQ2dLQ0FRRUF6elRUenNTc1pGWWxXeWJack1OdwphRGpncWluSU5vNXlOa0h1UkJQZzJyNTZCRWFIb1U1eStjY0RoeXhCR0NLUFprVGNRYXN2WWdXSjNzSFJLQWxOCmRaTkc4R3QzazJTcmZEcnJ0ajFLTDNHNk5XUkE4VHF5Umt4eGw1dnBBTWM2OVVqWDlIUHdTemxtckM3WlhtMWUKU3dZVFY3Kzdxcy82OUpMQm5yTUpjc2wrSXlYVWFoaFJuOHcyRmtzOUpXcmlOS2kxUFNnQ1BqTWpnS0JGN3lhRQpBVEowR01TTWM4RnZYV3dGSnNXQldRa1V3Z3FsRXhSMU1EaVZW [...] + .privateKey("key1", + "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlFcEFJQkFBS0NBUUVBenpUVHpzU3NaRllsV3liWnJNTndhRGpncWluSU5vNXlOa0h1UkJQZzJyNTZCRWFICm9VNXkrY2NEaHl4QkdDS1Baa1RjUWFzdllnV0ozc0hSS0FsTmRaTkc4R3QzazJTcmZEcnJ0ajFLTDNHNk5XUkEKOFRxeVJreHhsNXZwQU1jNjlValg5SFB3U3psbXJDN1pYbTFlU3dZVFY3Kzdxcy82OUpMQm5yTUpjc2wrSXlYVQphaGhSbjh3MkZrczlKV3JpTktpMVBTZ0NQak1qZ0tCRjd5YUVBVEowR01TTWM4RnZYV3dGSnNXQldRa1V3Z3FsCkV4UjFNRGlWVkJ0dzlRdEpCMjlJTmkwTkRzMlBlYjYx [...] + .publicKey("key2", "file:invalid").privateKey("key2", "file:invalid").publicKey("key3", "data:invalid") + .privateKey("key3", "data:invalid").build(); + + assertNotNull(keyReader2.getPublicKey("key0", null).getKey()); + assertEquals(keyReader2.getPublicKey("key0", null).getKey(), Files.readAllBytes(Paths.get(ecdsaPublicKey))); + assertNotNull(keyReader2.getPrivateKey("key0", null).getKey()); + assertEquals(keyReader2.getPrivateKey("key0", null).getKey(), Files.readAllBytes(Paths.get(ecdsaPrivateKey))); + + assertNotNull(keyReader2.getPublicKey("key1", null).getKey()); + assertEquals(keyReader2.getPublicKey("key1", null).getKey(), Files.readAllBytes(Paths.get(rsaPublicKey))); + assertNotNull(keyReader2.getPrivateKey("key1", null).getKey()); + assertEquals(keyReader2.getPrivateKey("key1", null).getKey(), Files.readAllBytes(Paths.get(rsaPrivateKey))); + + assertNull(keyReader1.getPublicKey("key2", null).getKey()); + assertNull(keyReader1.getPrivateKey("key2", null).getKey()); + assertNull(keyReader1.getPublicKey("key3", null).getKey()); + assertNull(keyReader1.getPrivateKey("key3", null).getKey()); + } + +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java index 1031f02..b3cda2e 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerBuilderImplTest.java @@ -195,6 +195,30 @@ public class ProducerBuilderImplTest { } @Test(expectedExceptions = IllegalArgumentException.class) + public void testProducerBuilderImplWhenDefaultCryptoKeyReaderIsNullString() throws PulsarClientException { + producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES); + producerBuilderImpl.topic(TOPIC_NAME).defaultCryptoKeyReader((String) null).create(); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testProducerBuilderImplWhenDefaultCryptoKeyReaderIsEmptyString() throws PulsarClientException { + producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES); + producerBuilderImpl.topic(TOPIC_NAME).defaultCryptoKeyReader("").create(); + } + + @Test(expectedExceptions = NullPointerException.class) + public void testProducerBuilderImplWhenDefaultCryptoKeyReaderIsNullMap() throws PulsarClientException { + producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES); + producerBuilderImpl.topic(TOPIC_NAME).defaultCryptoKeyReader((Map<String, String>) null).create(); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testProducerBuilderImplWhenDefaultCryptoKeyReaderIsEmptyMap() throws PulsarClientException { + producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES); + producerBuilderImpl.topic(TOPIC_NAME).defaultCryptoKeyReader(new HashMap<String, String>()).create(); + } + + @Test(expectedExceptions = IllegalArgumentException.class) public void testProducerBuilderImplWhenEncryptionKeyIsNull() throws PulsarClientException { producerBuilderImpl = new ProducerBuilderImpl(client, Schema.BYTES); producerBuilderImpl.topic(TOPIC_NAME) diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/DefaultCryptoKeyReaderConfigurationDataTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/DefaultCryptoKeyReaderConfigurationDataTest.java new file mode 100644 index 0000000..cb38ea5 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/DefaultCryptoKeyReaderConfigurationDataTest.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.conf; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +import org.testng.annotations.Test; + +public class DefaultCryptoKeyReaderConfigurationDataTest { + + @Test + public void testClone() throws Exception { + DefaultCryptoKeyReaderConfigurationData conf = new DefaultCryptoKeyReaderConfigurationData(); + conf.setDefaultPublicKey("file:///path/to/default-public.key"); + conf.setDefaultPrivateKey("file:///path/to/default-private.key"); + conf.setPublicKey("key1", "file:///path/to/public1.key"); + conf.setPrivateKey("key2", "file:///path/to/private2.key"); + DefaultCryptoKeyReaderConfigurationData clone = conf.clone(); + + conf.setDefaultPublicKey("data:AAAAA"); + conf.setDefaultPrivateKey("data:BBBBB"); + conf.setPublicKey("key1", "data:CCCCC"); + conf.setPrivateKey("key2", "data:DDDDD"); + + assertEquals(clone.getDefaultPublicKey(), "file:///path/to/default-public.key"); + assertEquals(clone.getDefaultPrivateKey(), "file:///path/to/default-private.key"); + assertEquals(clone.getPublicKeys().get("key1"), "file:///path/to/public1.key"); + assertEquals(clone.getPrivateKeys().get("key2"), "file:///path/to/private2.key"); + } + + @Test + public void testToString() throws Exception { + DefaultCryptoKeyReaderConfigurationData conf = new DefaultCryptoKeyReaderConfigurationData(); + assertEquals(conf.toString(), + "DefaultCryptoKeyReaderConfigurationData(defaultPublicKey=null, defaultPrivateKey=null, publicKeys={}, privateKeys={})"); + + conf.setDefaultPublicKey("file:///path/to/default-public.key"); + conf.setDefaultPrivateKey("data:AAAAA"); + conf.setPublicKey("key1", "file:///path/to/public.key"); + conf.setPrivateKey("key2", "file:///path/to/private.key"); + assertEquals(conf.toString(), + "DefaultCryptoKeyReaderConfigurationData(defaultPublicKey=file:///path/to/default-public.key, defaultPrivateKey=data:*****, publicKeys={key1=file:///path/to/public.key}, privateKeys={key2=file:///path/to/private.key})"); + + conf.setPublicKey("key3", "data:BBBBB"); + conf.setPrivateKey("key4", "data:CCCCC"); + assertTrue(conf.toString().startsWith( + "DefaultCryptoKeyReaderConfigurationData(defaultPublicKey=file:///path/to/default-public.key, defaultPrivateKey=data:*****, publicKeys={")); + assertTrue(conf.toString().contains("key3=data:*****")); + assertFalse(conf.toString().contains("key3=data:BBBBB")); + assertTrue(conf.toString().contains("key4=data:*****")); + assertFalse(conf.toString().contains("key4=data:CCCCC")); + assertTrue(conf.toString().endsWith("})")); + } + +} diff --git a/pulsar-client/src/test/resources/crypto_ecdsa_private.key b/pulsar-client/src/test/resources/crypto_ecdsa_private.key new file mode 100644 index 0000000..036a1e7 --- /dev/null +++ b/pulsar-client/src/test/resources/crypto_ecdsa_private.key @@ -0,0 +1,29 @@ +-----BEGIN EC PARAMETERS----- +MIIBwgIBATBNBgcqhkjOPQEBAkIB//////////////////////////////////// +//////////////////////////////////////////////////8wgZ4EQgH///// +//////////////////////////////////////////////////////////////// +/////////////////ARBUZU+uWGOHJofkpohoLaFQO6i2nJbmbMV87i0iZGO8Qnh +Vhk5Uex+k3sWUsC9O7G/BzVz34g9LDTx70Uf1GtQPwADFQDQnogAKRy4U5bMZxc5 +MoSqoNpkugSBhQQAxoWOBrcEBOnNnj7LZiOVtEKcZIE5BT+1Ifgor2BrTT26oUte +d+/nWSj+HcEnov+o3jNIs8GFakKb+X5+McLlvWYBGDkpaniaO8AEXIpftCx9G9mY +9URJV5tEaBevvRcnPmYsl+5ymV70JkDFULkBP60HYTU8cIaicsJAiL6Udp/RZlAC +QgH///////////////////////////////////////////pRhoeDvy+Wa3/MAUj3 +CaXQO7XJuImcR667b7cekThkCQIBAQ== +-----END EC PARAMETERS----- +-----BEGIN EC PRIVATE KEY----- +MIICnQIBAQRCAPvNzZPrKUZao8oLRet7MUTfa9TUEJYw2C4TyMSP54YWzyoQsGJJ +FQVGsJJWBvmEDTfU5zU6d+vvbLVWy7R0aMDXoIIBxjCCAcICAQEwTQYHKoZIzj0B +AQJCAf////////////////////////////////////////////////////////// +////////////////////////////MIGeBEIB//////////////////////////// +//////////////////////////////////////////////////////////wEQVGV +PrlhjhyaH5KaIaC2hUDuotpyW5mzFfO4tImRjvEJ4VYZOVHsfpN7FlLAvTuxvwc1 +c9+IPSw08e9FH9RrUD8AAxUA0J6IACkcuFOWzGcXOTKEqqDaZLoEgYUEAMaFjga3 +BATpzZ4+y2YjlbRCnGSBOQU/tSH4KK9ga009uqFLXnfv51ko/h3BJ6L/qN4zSLPB +hWpCm/l+fjHC5b1mARg5KWp4mjvABFyKX7QsfRvZmPVESVebRGgXr70XJz5mLJfu +cple9CZAxVC5AT+tB2E1PHCGonLCQIi+lHaf0WZQAkIB//////////////////// +///////////////////////6UYaHg78vlmt/zAFI9wml0Du1ybiJnEeuu2+3HpE4 +ZAkCAQGhgYkDgYYABAFqUEjls03bQowJQUSnTiqzTvdXE26NN891SHoDx2wqfRg2 +OSjzI1bQcBT2GARfut0/tVDbx3gX8qmaPEY9un3dugHSEgX209yLhHuXfIJKZm6Y +indL6RxwL6Wdqv9bsHUi7dSXTBIc60C0a6gxzhX0I9P4g+Og8hrsW0Hmmm1zJQ8x +gQ== +-----END EC PRIVATE KEY----- diff --git a/pulsar-client/src/test/resources/crypto_ecdsa_public.key b/pulsar-client/src/test/resources/crypto_ecdsa_public.key new file mode 100644 index 0000000..941afeb --- /dev/null +++ b/pulsar-client/src/test/resources/crypto_ecdsa_public.key @@ -0,0 +1,15 @@ +-----BEGIN PUBLIC KEY----- +MIICXDCCAc8GByqGSM49AgEwggHCAgEBME0GByqGSM49AQECQgH///////////// +//////////////////////////////////////////////////////////////// +/////////zCBngRCAf////////////////////////////////////////////// +///////////////////////////////////////8BEFRlT65YY4cmh+SmiGgtoVA +7qLacluZsxXzuLSJkY7xCeFWGTlR7H6TexZSwL07sb8HNXPfiD0sNPHvRR/Ua1A/ +AAMVANCeiAApHLhTlsxnFzkyhKqg2mS6BIGFBADGhY4GtwQE6c2ePstmI5W0Qpxk +gTkFP7Uh+CivYGtNPbqhS1537+dZKP4dwSei/6jeM0izwYVqQpv5fn4xwuW9ZgEY +OSlqeJo7wARcil+0LH0b2Zj1RElXm0RoF6+9Fyc+ZiyX7nKZXvQmQMVQuQE/rQdh +NTxwhqJywkCIvpR2n9FmUAJCAf////////////////////////////////////// +////+lGGh4O/L5Zrf8wBSPcJpdA7tcm4iZxHrrtvtx6ROGQJAgEBA4GGAAQBalBI +5bNN20KMCUFEp04qs073VxNujTfPdUh6A8dsKn0YNjko8yNW0HAU9hgEX7rdP7VQ +28d4F/KpmjxGPbp93boB0hIF9tPci4R7l3yCSmZumIp3S+kccC+lnar/W7B1Iu3U +l0wSHOtAtGuoMc4V9CPT+IPjoPIa7FtB5pptcyUPMYE= +-----END PUBLIC KEY----- diff --git a/pulsar-client/src/test/resources/crypto_rsa_private.key b/pulsar-client/src/test/resources/crypto_rsa_private.key new file mode 100644 index 0000000..5177bb9 --- /dev/null +++ b/pulsar-client/src/test/resources/crypto_rsa_private.key @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEpAIBAAKCAQEAzzTTzsSsZFYlWybZrMNwaDjgqinINo5yNkHuRBPg2r56BEaH +oU5y+ccDhyxBGCKPZkTcQasvYgWJ3sHRKAlNdZNG8Gt3k2SrfDrrtj1KL3G6NWRA +8TqyRkxxl5vpAMc69UjX9HPwSzlmrC7ZXm1eSwYTV7+7qs/69JLBnrMJcsl+IyXU +ahhRn8w2Fks9JWriNKi1PSgCPjMjgKBF7yaEATJ0GMSMc8FvXWwFJsWBWQkUwgql +ExR1MDiVVBtw9QtJB29INi0NDs2Peb61Dt49NZlN/klJCXIUtBSIqg9o+iRKVwXL +HnIM0WHVnmRn2MK0nf03/DwCIVnb5elToZ728wIDAQABAoIBAHWFvfiRnkGOhsOL +ZnzRoMjMMcjxx8gBxQ+3F1/vcmI/FM+l/TllWFsJIJwjYoxA1dqohdCNOmO7RnZc +sbemhxN/xAWKvpiPyZ+9f4Gug4wjUf0Ebr0jkIfExcy6tk4lse/7L9lLhOf1l6Fj +NRCUsZ2VxZTIf7WjHvBm6ICNhXdfgc/TOX/H4BBMxyQkkmvS7yQHPmnekVpCjwXi +RgdPOpBSHUAsuLc3cdO7DzSlXBy+R3UB5bC97efSTwxmMdcGUNQh17Cuw+ou2OLJ +l/WyMBJgKP0zp8MI2YCP0toE1VV0FWisiNUdyw2mmdsKBPCtZW4Jf/avRLjCpy83 +gyeHi4kCgYEA7fac8u/WoUcRxfwbexxVNIb5jpVgQ22XXUvcOBA3145HFH4kD9Qs +OlOM48iEX1Gq+FOb+TkVa3yeEbyEHVmNxm7ZqDElGlPnHHgWJfVo4ltemkNQ8cQI +CiDhUH7D9iGd4TrLq+u8JY/oy0dpJyaJ/GsNPwjVzMiA9kDuI2KKRpcCgYEA3ulp +sZy82EibT+YYNshfAyzPcQc+CVdcpS3yEUMcMrrGT2IHbAhldzAWvni7PF1CvYoB +eoPSGiEGMAwAfaUI4psZvATZTf+GWKizlH820lw4tY2NW2tYFwDcZ6EPKdq9ZCOz +Lfyw2NhLqi2FpFyApcXlA92UUI0FcXCCtAKJ2gUCgYEAsI5merVKiNTDMiNQfHJU +Ian3pTvddYntYXJ0jUAzPoK46FKDDR9+RTRSd3sCA/sDIEZglnQtGVgXq880MtaM +I2uBoJH+Fl+kPPI4dKd1z2S9dzV07DxnPqSQp/m2CXt9uWu3S/KW4UO6FIECWuL0 +RE1lQZybjNpDHPKl/akSMTcCgYBqC80WjCRjGJeauTJHzac10XmWogVnWEJg6qzA +fZbKo4R4e4BgatYqj5wiXTlmDFAV77OoX1Hy0EcrUGpjW8IQXA0wH3Zp3uhBAXD9 +ck/YX7sy0/atyTGNMAGq4zpdhQyYuUsi05Ymcy/789AiU0d4ld7PqfhHIe+2+fmU +PajrKQKBgQCjpj0Y+E/Hi0SJUKNVPl3u+SN1lLcR1/gMjJbsYFGua1MsLXBNXTSl +QiYHiadT7BhPEkFdW7u+bFws2admW19BoUb+wgtZT/t7nTyoS4Lag4vyazNPZzdQ +x6XPrwZimd0XDDitGLjcLf991EFsXSqPznDDfXtJ31+oU6ObnHSItA== +-----END RSA PRIVATE KEY----- diff --git a/pulsar-client/src/test/resources/crypto_rsa_public.key b/pulsar-client/src/test/resources/crypto_rsa_public.key new file mode 100644 index 0000000..6982804 --- /dev/null +++ b/pulsar-client/src/test/resources/crypto_rsa_public.key @@ -0,0 +1,9 @@ +-----BEGIN PUBLIC KEY----- +MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAzzTTzsSsZFYlWybZrMNw +aDjgqinINo5yNkHuRBPg2r56BEaHoU5y+ccDhyxBGCKPZkTcQasvYgWJ3sHRKAlN +dZNG8Gt3k2SrfDrrtj1KL3G6NWRA8TqyRkxxl5vpAMc69UjX9HPwSzlmrC7ZXm1e +SwYTV7+7qs/69JLBnrMJcsl+IyXUahhRn8w2Fks9JWriNKi1PSgCPjMjgKBF7yaE +ATJ0GMSMc8FvXWwFJsWBWQkUwgqlExR1MDiVVBtw9QtJB29INi0NDs2Peb61Dt49 +NZlN/klJCXIUtBSIqg9o+iRKVwXLHnIM0WHVnmRn2MK0nf03/DwCIVnb5elToZ72 +8wIDAQAB +-----END PUBLIC KEY-----
