This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e360379cae5 [feat][client] Enable custom Encrypt Decrypt methods for
Reader (#12599)
e360379cae5 is described below
commit e360379cae536924ad1aebeda17e335cb9d61d6b
Author: Shashank Rathore <[email protected]>
AuthorDate: Fri Jun 30 16:54:09 2023 +0530
[feat][client] Enable custom Encrypt Decrypt methods for Reader (#12599)
Signed-off-by: tison <[email protected]>
Co-authored-by: shrathore <[email protected]>
Co-authored-by: tison <[email protected]>
---
.../apache/pulsar/client/api/ReaderBuilder.java | 9 +++++
.../pulsar/client/impl/MultiTopicsReaderImpl.java | 6 ++-
.../pulsar/client/impl/ReaderBuilderImpl.java | 7 ++++
.../org/apache/pulsar/client/impl/ReaderImpl.java | 10 +++--
.../client/impl/conf/ReaderConfigurationData.java | 9 +++++
.../apache/pulsar/client/impl/ReaderImplTest.java | 44 ++++++++++++++++------
6 files changed, 70 insertions(+), 15 deletions(-)
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
index 7241bd4cf4f..d522c3c4b6e 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
@@ -210,6 +210,15 @@ public interface ReaderBuilder<T> extends Cloneable {
*/
ReaderBuilder<T> cryptoFailureAction(ConsumerCryptoFailureAction action);
+ /**
+ * Sets a {@link MessageCrypto}.
+ *
+ * <p>Contains methods to encrypt/decrypt message for End to End
Encryption.
+ *
+ * @param messageCrypto message Crypto Object
+ * @return ReaderBuilder instance
+ */
+ ReaderBuilder<T> messageCrypto(MessageCrypto messageCrypto);
/**
* Sets the size of the consumer receive queue.
*
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
index 0f1a7429f49..bef2ccaf92e 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
@@ -109,6 +109,10 @@ public class MultiTopicsReaderImpl<T> implements Reader<T>
{
if (readerConfiguration.getCryptoKeyReader() != null) {
consumerConfiguration.setCryptoKeyReader(readerConfiguration.getCryptoKeyReader());
}
+
+ if (readerConfiguration.getMessageCrypto() != null) {
+
consumerConfiguration.setMessageCrypto(readerConfiguration.getMessageCrypto());
+ }
if (readerConfiguration.getKeyHashRanges() != null) {
consumerConfiguration.setKeySharedPolicy(
KeySharedPolicy
@@ -126,7 +130,7 @@ public class MultiTopicsReaderImpl<T> implements Reader<T> {
ReaderInterceptorUtil.convertToConsumerInterceptors(
this, readerConfiguration.getReaderInterceptorList());
multiTopicsConsumer = new MultiTopicsConsumerImpl<>(client,
consumerConfiguration, executorProvider,
- consumerFuture, schema, consumerInterceptors, true,
+ consumerFuture, schema, consumerInterceptors, true,
readerConfiguration.getStartMessageId(),
readerConfiguration.getStartMessageFromRollbackDurationInSec());
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
index ca2011cf18a..2860cda0cee 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
@@ -32,6 +32,7 @@ import lombok.NonNull;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.MessageCrypto;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Range;
@@ -173,6 +174,12 @@ public class ReaderBuilderImpl<T> implements
ReaderBuilder<T> {
return this;
}
+ @Override
+ public ReaderBuilder<T> messageCrypto(MessageCrypto messageCrypto) {
+ conf.setMessageCrypto(messageCrypto);
+ return this;
+ }
+
@Override
public ReaderBuilder<T> receiverQueueSize(int receiverQueueSize) {
checkArgument(receiverQueueSize >= 0, "receiverQueueSize needs to be
>= 0");
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index 099098fcfab..6f8d4bcbc75 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -121,11 +121,15 @@ public class ReaderImpl<T> implements Reader<T> {
consumerConfiguration.setCryptoKeyReader(readerConfiguration.getCryptoKeyReader());
}
+ if (readerConfiguration.getMessageCrypto() != null) {
+
consumerConfiguration.setMessageCrypto(readerConfiguration.getMessageCrypto());
+ }
+
if (readerConfiguration.getKeyHashRanges() != null) {
consumerConfiguration.setKeySharedPolicy(
- KeySharedPolicy
- .stickyHashRange()
- .ranges(readerConfiguration.getKeyHashRanges())
+ KeySharedPolicy
+ .stickyHashRange()
+ .ranges(readerConfiguration.getKeyHashRanges())
);
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
index 86707d2aa2f..a038383564d 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl.conf;
import com.fasterxml.jackson.annotation.JsonIgnore;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.swagger.annotations.ApiModelProperty;
import java.io.Serializable;
import java.util.HashSet;
@@ -26,8 +27,11 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import lombok.Data;
+import lombok.Getter;
+import lombok.Setter;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.MessageCrypto;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.ReaderInterceptor;
@@ -113,6 +117,11 @@ public class ReaderConfigurationData<T> implements
Serializable, Cloneable {
)
private ConsumerCryptoFailureAction cryptoFailureAction =
ConsumerCryptoFailureAction.FAIL;
+ @JsonIgnore
+ @Setter(onMethod_ = @SuppressFBWarnings({"EI_EXPOSE_REP2"}))
+ @Getter(onMethod_ = @SuppressFBWarnings({"EI_EXPOSE_REP"}))
+ private transient MessageCrypto messageCrypto = null;
+
@ApiModelProperty(
name = "readCompacted",
value = "If enabling `readCompacted`, a consumer reads messages
from a compacted topic rather than a full "
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java
index 1bbdf274293..0349dbfe265 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java
@@ -18,23 +18,27 @@
*/
package org.apache.pulsar.client.impl;
-import static org.testng.AssertJUnit.assertFalse;
-import static org.testng.AssertJUnit.assertTrue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.AssertJUnit.assertFalse;
+import static org.testng.AssertJUnit.assertTrue;
public class ReaderImplTest {
- private ReaderImpl<byte[]> reader;
+ private PulsarClientImpl client;
private ExecutorProvider executorProvider;
private ExecutorService internalExecutor;
@@ -42,17 +46,15 @@ public class ReaderImplTest {
void setupReader() {
executorProvider = new ExecutorProvider(1, "ReaderImplTest");
internalExecutor = Executors.newSingleThreadScheduledExecutor();
- PulsarClientImpl mockedClient =
ClientTestFixtures.createPulsarClientMockWithMockedClientCnx(
- executorProvider, internalExecutor);
- ReaderConfigurationData<byte[]> readerConfiguration = new
ReaderConfigurationData<>();
- readerConfiguration.setTopicName("topicName");
- CompletableFuture<Consumer<byte[]>> consumerFuture = new
CompletableFuture<>();
- reader = new ReaderImpl<>(mockedClient, readerConfiguration,
ClientTestFixtures.createMockedExecutorProvider(),
- consumerFuture, Schema.BYTES);
+ client =
ClientTestFixtures.createPulsarClientMockWithMockedClientCnx(executorProvider,
internalExecutor);
}
@AfterMethod
- public void clean() {
+ public void clean() throws Exception {
+ if (client != null) {
+ client.close();
+ client = null;
+ }
if (executorProvider != null) {
executorProvider.shutdownNow();
executorProvider = null;
@@ -65,6 +67,16 @@ public class ReaderImplTest {
@Test
void shouldSupportCancellingReadNextAsync() {
+ ReaderConfigurationData<byte[]> readerConfiguration = new
ReaderConfigurationData<>();
+ readerConfiguration.setTopicName("topicName");
+ CompletableFuture<Consumer<byte[]>> consumerFuture = new
CompletableFuture<>();
+ ReaderImpl<byte[]> reader = new ReaderImpl<>(
+ client,
+ readerConfiguration,
+ ClientTestFixtures.createMockedExecutorProvider(),
+ consumerFuture,
+ Schema.BYTES);
+
// given
CompletableFuture<Message<byte[]>> future = reader.readNextAsync();
Awaitility.await().untilAsserted(() -> {
@@ -77,4 +89,14 @@ public class ReaderImplTest {
// then
assertFalse(reader.getConsumer().hasNextPendingReceive());
}
+
+ @Test
+ public void testReaderBuilderWhenMessageCryptoSet() throws
PulsarClientException {
+ ReaderBuilderImpl<byte[]> builder = new ReaderBuilderImpl<>(client,
Schema.BYTES);
+ builder.topic("testTopicName");
+ builder.startMessageFromRollbackDuration(1, TimeUnit.SECONDS);
+ builder.messageCrypto(new MessageCryptoBc("ctx1", true));
+ assertNotNull(builder.create());
+ assertNotNull(builder.getConf().getMessageCrypto());
+ }
}