This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e360379cae5 [feat][client] Enable custom Encrypt Decrypt methods for 
Reader (#12599)
e360379cae5 is described below

commit e360379cae536924ad1aebeda17e335cb9d61d6b
Author: Shashank Rathore <[email protected]>
AuthorDate: Fri Jun 30 16:54:09 2023 +0530

    [feat][client] Enable custom Encrypt Decrypt methods for Reader (#12599)
    
    Signed-off-by: tison <[email protected]>
    Co-authored-by: shrathore <[email protected]>
    Co-authored-by: tison <[email protected]>
---
 .../apache/pulsar/client/api/ReaderBuilder.java    |  9 +++++
 .../pulsar/client/impl/MultiTopicsReaderImpl.java  |  6 ++-
 .../pulsar/client/impl/ReaderBuilderImpl.java      |  7 ++++
 .../org/apache/pulsar/client/impl/ReaderImpl.java  | 10 +++--
 .../client/impl/conf/ReaderConfigurationData.java  |  9 +++++
 .../apache/pulsar/client/impl/ReaderImplTest.java  | 44 ++++++++++++++++------
 6 files changed, 70 insertions(+), 15 deletions(-)

diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
index 7241bd4cf4f..d522c3c4b6e 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
@@ -210,6 +210,15 @@ public interface ReaderBuilder<T> extends Cloneable {
      */
     ReaderBuilder<T> cryptoFailureAction(ConsumerCryptoFailureAction action);
 
+    /**
+     * Sets a {@link MessageCrypto}.
+     *
+     * <p>Contains methods to encrypt/decrypt message for End to End 
Encryption.
+     *
+     * @param messageCrypto message Crypto Object
+     * @return ReaderBuilder instance
+     */
+    ReaderBuilder<T> messageCrypto(MessageCrypto messageCrypto);
     /**
      * Sets the size of the consumer receive queue.
      *
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
index 0f1a7429f49..bef2ccaf92e 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
@@ -109,6 +109,10 @@ public class MultiTopicsReaderImpl<T> implements Reader<T> 
{
         if (readerConfiguration.getCryptoKeyReader() != null) {
             
consumerConfiguration.setCryptoKeyReader(readerConfiguration.getCryptoKeyReader());
         }
+
+        if (readerConfiguration.getMessageCrypto() != null) {
+            
consumerConfiguration.setMessageCrypto(readerConfiguration.getMessageCrypto());
+        }
         if (readerConfiguration.getKeyHashRanges() != null) {
             consumerConfiguration.setKeySharedPolicy(
                     KeySharedPolicy
@@ -126,7 +130,7 @@ public class MultiTopicsReaderImpl<T> implements Reader<T> {
                 ReaderInterceptorUtil.convertToConsumerInterceptors(
                         this, readerConfiguration.getReaderInterceptorList());
         multiTopicsConsumer = new MultiTopicsConsumerImpl<>(client, 
consumerConfiguration, executorProvider,
-                consumerFuture, schema,  consumerInterceptors, true,
+                consumerFuture, schema, consumerInterceptors, true,
                 readerConfiguration.getStartMessageId(),
                 
readerConfiguration.getStartMessageFromRollbackDurationInSec());
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
index ca2011cf18a..2860cda0cee 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
@@ -32,6 +32,7 @@ import lombok.NonNull;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
 import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.MessageCrypto;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Range;
@@ -173,6 +174,12 @@ public class ReaderBuilderImpl<T> implements 
ReaderBuilder<T> {
         return this;
     }
 
+    @Override
+    public ReaderBuilder<T> messageCrypto(MessageCrypto messageCrypto) {
+        conf.setMessageCrypto(messageCrypto);
+        return this;
+    }
+
     @Override
     public ReaderBuilder<T> receiverQueueSize(int receiverQueueSize) {
         checkArgument(receiverQueueSize >= 0, "receiverQueueSize needs to be 
>= 0");
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index 099098fcfab..6f8d4bcbc75 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -121,11 +121,15 @@ public class ReaderImpl<T> implements Reader<T> {
             
consumerConfiguration.setCryptoKeyReader(readerConfiguration.getCryptoKeyReader());
         }
 
+        if (readerConfiguration.getMessageCrypto() != null) {
+            
consumerConfiguration.setMessageCrypto(readerConfiguration.getMessageCrypto());
+        }
+
         if (readerConfiguration.getKeyHashRanges() != null) {
             consumerConfiguration.setKeySharedPolicy(
-                KeySharedPolicy
-                    .stickyHashRange()
-                    .ranges(readerConfiguration.getKeyHashRanges())
+                    KeySharedPolicy
+                            .stickyHashRange()
+                            .ranges(readerConfiguration.getKeyHashRanges())
             );
         }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
index 86707d2aa2f..a038383564d 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.impl.conf;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import io.swagger.annotations.ApiModelProperty;
 import java.io.Serializable;
 import java.util.HashSet;
@@ -26,8 +27,11 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import lombok.Data;
+import lombok.Getter;
+import lombok.Setter;
 import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
 import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.MessageCrypto;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Range;
 import org.apache.pulsar.client.api.ReaderInterceptor;
@@ -113,6 +117,11 @@ public class ReaderConfigurationData<T> implements 
Serializable, Cloneable {
     )
     private ConsumerCryptoFailureAction cryptoFailureAction = 
ConsumerCryptoFailureAction.FAIL;
 
+    @JsonIgnore
+    @Setter(onMethod_ = @SuppressFBWarnings({"EI_EXPOSE_REP2"}))
+    @Getter(onMethod_ = @SuppressFBWarnings({"EI_EXPOSE_REP"}))
+    private transient MessageCrypto messageCrypto = null;
+
     @ApiModelProperty(
             name = "readCompacted",
             value = "If enabling `readCompacted`, a consumer reads messages 
from a compacted topic rather than a full "
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java
index 1bbdf274293..0349dbfe265 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java
@@ -18,23 +18,27 @@
  */
 package org.apache.pulsar.client.impl;
 
-import static org.testng.AssertJUnit.assertFalse;
-import static org.testng.AssertJUnit.assertTrue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
 import org.apache.pulsar.client.util.ExecutorProvider;
 import org.awaitility.Awaitility;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.AssertJUnit.assertFalse;
+import static org.testng.AssertJUnit.assertTrue;
 
 public class ReaderImplTest {
-    private ReaderImpl<byte[]> reader;
+    private PulsarClientImpl client;
     private ExecutorProvider executorProvider;
     private ExecutorService internalExecutor;
 
@@ -42,17 +46,15 @@ public class ReaderImplTest {
     void setupReader() {
         executorProvider = new ExecutorProvider(1, "ReaderImplTest");
         internalExecutor = Executors.newSingleThreadScheduledExecutor();
-        PulsarClientImpl mockedClient = 
ClientTestFixtures.createPulsarClientMockWithMockedClientCnx(
-                executorProvider, internalExecutor);
-        ReaderConfigurationData<byte[]> readerConfiguration = new 
ReaderConfigurationData<>();
-        readerConfiguration.setTopicName("topicName");
-        CompletableFuture<Consumer<byte[]>> consumerFuture = new 
CompletableFuture<>();
-        reader = new ReaderImpl<>(mockedClient, readerConfiguration, 
ClientTestFixtures.createMockedExecutorProvider(),
-                consumerFuture, Schema.BYTES);
+        client = 
ClientTestFixtures.createPulsarClientMockWithMockedClientCnx(executorProvider, 
internalExecutor);
     }
 
     @AfterMethod
-    public void clean() {
+    public void clean() throws Exception {
+        if (client != null) {
+            client.close();
+            client = null;
+        }
         if (executorProvider != null) {
             executorProvider.shutdownNow();
             executorProvider = null;
@@ -65,6 +67,16 @@ public class ReaderImplTest {
 
     @Test
     void shouldSupportCancellingReadNextAsync() {
+        ReaderConfigurationData<byte[]> readerConfiguration = new 
ReaderConfigurationData<>();
+        readerConfiguration.setTopicName("topicName");
+        CompletableFuture<Consumer<byte[]>> consumerFuture = new 
CompletableFuture<>();
+        ReaderImpl<byte[]> reader = new ReaderImpl<>(
+                client,
+                readerConfiguration,
+                ClientTestFixtures.createMockedExecutorProvider(),
+                consumerFuture,
+                Schema.BYTES);
+
         // given
         CompletableFuture<Message<byte[]>> future = reader.readNextAsync();
         Awaitility.await().untilAsserted(() -> {
@@ -77,4 +89,14 @@ public class ReaderImplTest {
         // then
         assertFalse(reader.getConsumer().hasNextPendingReceive());
     }
+
+    @Test
+    public void testReaderBuilderWhenMessageCryptoSet() throws 
PulsarClientException {
+        ReaderBuilderImpl<byte[]> builder = new ReaderBuilderImpl<>(client, 
Schema.BYTES);
+        builder.topic("testTopicName");
+        builder.startMessageFromRollbackDuration(1, TimeUnit.SECONDS);
+        builder.messageCrypto(new MessageCryptoBc("ctx1", true));
+        assertNotNull(builder.create());
+        assertNotNull(builder.getConf().getMessageCrypto());
+    }
 }

Reply via email to