This is an automated email from the ASF dual-hosted git repository. yong pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c449b0450cb93ab9356f32344440d370f9249927 Author: Masahiro Sakamoto <[email protected]> AuthorDate: Wed Feb 24 18:21:42 2021 +0900 [cli] pulsar-perf uses DefaultCryptoKeyReader for E2E encryption (#9668) Currently, `pulsar-perf` has its own implementation class of `CryptoKeyReader` for end-to-end encryption of messages. Instead of this class, use `DefaultCryptoKeyReader` implemented in https://github.com/apache/pulsar/pull/9379. This change allows us to specify public and private keys by data URI as well as by file path. (cherry picked from commit aa635117bad5b9e04446fd68b7c0feecf17e51b5) --- .../pulsar/testclient/PerformanceConsumer.java | 34 ++-------------------- .../pulsar/testclient/PerformanceProducer.java | 34 ++-------------------- 2 files changed, 4 insertions(+), 64 deletions(-) diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java index 9a3a371..0ee99a5 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java @@ -22,7 +22,6 @@ import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isNotBlank; import java.io.FileInputStream; -import java.nio.file.Files; import java.nio.file.Paths; import java.text.DecimalFormat; import java.util.List; @@ -37,8 +36,6 @@ import org.HdrHistogram.Recorder; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; -import org.apache.pulsar.client.api.CryptoKeyReader; -import org.apache.pulsar.client.api.EncryptionKeyInfo; import org.apache.pulsar.client.api.MessageListener; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.SubscriptionInitialPosition; @@ -154,9 +151,6 @@ public class PerformanceConsumer { "--tls-allow-insecure" }, description = "Allow insecure TLS connection") public Boolean tlsAllowInsecureConnection = null; - @Parameter(names = { "-k", "--encryption-key-name" }, description = "The private key name to decrypt payload") - public String encKeyName = null; - @Parameter(names = { "-v", "--encryption-key-value-file" }, description = "The file which contains the private key to decrypt payload") public String encKeyFile = null; @@ -295,28 +289,6 @@ public class PerformanceConsumer { PulsarClient pulsarClient = clientBuilder.build(); - class EncKeyReader implements CryptoKeyReader { - - EncryptionKeyInfo keyInfo = new EncryptionKeyInfo(); - - EncKeyReader(byte[] value) { - keyInfo.setKey(value); - } - - @Override - public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) { - return null; - } - - @Override - public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) { - if (keyName.equals(arguments.encKeyName)) { - return keyInfo; - } - return null; - } - } - List<Future<Consumer<byte[]>>> futures = Lists.newArrayList(); ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer() // .messageListener(listener) // @@ -336,10 +308,8 @@ public class PerformanceConsumer { TimeUnit.MILLISECONDS); } - if (arguments.encKeyName != null) { - byte[] pKey = Files.readAllBytes(Paths.get(arguments.encKeyFile)); - EncKeyReader keyReader = new EncKeyReader(pKey); - consumerBuilder.cryptoKeyReader(keyReader); + if (isNotBlank(arguments.encKeyFile)) { + consumerBuilder.defaultCryptoKeyReader(arguments.encKeyFile); } for (int i = 0; i < arguments.numTopics; i++) { diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java index 9153812..92dda6e 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java @@ -57,8 +57,6 @@ import org.HdrHistogram.HistogramLogWriter; import org.HdrHistogram.Recorder; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.CompressionType; -import org.apache.pulsar.client.api.CryptoKeyReader; -import org.apache.pulsar.client.api.EncryptionKeyInfo; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; @@ -217,32 +215,6 @@ public class PerformanceProducer { public int ioThreads = 1; } - static class EncKeyReader implements CryptoKeyReader { - - private static final long serialVersionUID = 7235317430835444498L; - - final String encKeyName; - final EncryptionKeyInfo keyInfo = new EncryptionKeyInfo(); - - EncKeyReader(String encKeyName, byte[] value) { - this.encKeyName = encKeyName; - keyInfo.setKey(value); - } - - @Override - public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) { - if (keyName.equals(encKeyName)) { - return keyInfo; - } - return null; - } - - @Override - public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) { - return null; - } - } - public static void main(String[] args) throws Exception { final Arguments arguments = new Arguments(); @@ -471,11 +443,9 @@ public class PerformanceProducer { // Block if queue is full else we will start seeing errors in sendAsync producerBuilder.blockIfQueueFull(true); - if (arguments.encKeyName != null) { + if (isNotBlank(arguments.encKeyName) && isNotBlank(arguments.encKeyFile)) { producerBuilder.addEncryptionKey(arguments.encKeyName); - byte[] pKey = Files.readAllBytes(Paths.get(arguments.encKeyFile)); - EncKeyReader keyReader = new EncKeyReader(arguments.encKeyName, pKey); - producerBuilder.cryptoKeyReader(keyReader); + producerBuilder.defaultCryptoKeyReader(arguments.encKeyFile); } for (int i = 0; i < arguments.numTopics; i++) {
