This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5e4579d Pass encryption-context to pulsar-sink if source receive
encrypted message (#2068)
5e4579d is described below
commit 5e4579d2fbfa70ac0668a298c269e31d3b54ab13
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Mon Jul 2 17:51:33 2018 -0700
Pass encryption-context to pulsar-sink if source receive encrypted message
(#2068)
* Pass encryption-context to pulsar-sink if source receive encrypted message
* move Encryption-ctx to api-pkg
---
.../client/api/SimpleProducerConsumerTest.java | 4 ++--
.../client/api/ConsumerCryptoFailureAction.java | 2 +-
.../java/org/apache/pulsar/client/api/Message.java | 12 ++++++++++++
.../org/apache/pulsar/client/impl/ConsumerImpl.java | 4 ++--
.../org/apache/pulsar/client/impl/MessageImpl.java | 2 ++
.../apache/pulsar/client/impl/MessageRecordImpl.java | 15 +++++++++++++++
.../apache/pulsar/client/impl/TopicMessageImpl.java | 9 ++++++++-
.../apache/pulsar/common/api}/EncryptionContext.java | 2 +-
.../apache/pulsar/functions/source/PulsarRecord.java | 3 +++
.../apache/pulsar/functions/source/PulsarSource.java | 2 ++
pulsar-io/core/pom.xml | 12 ++++++++++++
.../org/apache/pulsar/io/core/RecordContext.java | 20 ++++++++++++++++++++
12 files changed, 80 insertions(+), 7 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 7fb027d..50d9687 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -55,8 +55,8 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import
org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException;
import org.apache.pulsar.client.impl.ConsumerImpl;
-import org.apache.pulsar.client.impl.EncryptionContext;
-import org.apache.pulsar.client.impl.EncryptionContext.EncryptionKey;
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
import org.apache.pulsar.client.impl.MessageCrypto;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerCryptoFailureAction.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerCryptoFailureAction.java
index e71b798..3ab48f7 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerCryptoFailureAction.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerCryptoFailureAction.java
@@ -19,7 +19,7 @@
package org.apache.pulsar.client.api;
-import org.apache.pulsar.client.impl.EncryptionContext;
+import org.apache.pulsar.common.api.EncryptionContext;
public enum ConsumerCryptoFailureAction {
FAIL, // This is the default option to fail consume until crypto succeeds
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Message.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Message.java
index 33be458..d614962 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Message.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Message.java
@@ -20,6 +20,10 @@
package org.apache.pulsar.client.api;
import java.util.Map;
+import java.util.Optional;
+
+import org.apache.pulsar.common.api.EncryptionContext;
+
/**
* The message abstraction used in Pulsar.
@@ -127,4 +131,12 @@ public interface Message<T> {
* @return the key of the message
*/
String getKey();
+
+ /**
+ * {@link EncryptionContext} contains encryption and compression
information in it using which application can
+ * decrypt consumed message with encrypted-payload.
+ *
+ * @return
+ */
+ Optional<EncryptionContext> getEncryptionCtx();
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index e7924cb..6f78119 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -63,9 +63,10 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.impl.EncryptionContext.EncryptionKey;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.common.api.Commands;
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
import org.apache.pulsar.common.api.PulsarDecoder;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
@@ -83,7 +84,6 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
public class ConsumerImpl<T> extends ConsumerBase<T> implements
ConnectionHandler.Connection {
private static final int MAX_REDELIVER_UNACKNOWLEDGED = 1000;
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 4adada5..504cc61 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.common.api.Commands;
+import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
@@ -330,6 +331,7 @@ public class MessageImpl<T> extends MessageRecordImpl<T,
MessageId> {
this.messageId = messageId;
}
+ @Override
public Optional<EncryptionContext> getEncryptionCtx() {
return encryptionCtx;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRecordImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRecordImpl.java
index a27ac13..ea1d892 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRecordImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRecordImpl.java
@@ -18,8 +18,13 @@
*/
package org.apache.pulsar.client.impl;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.io.core.Record;
/**
@@ -62,4 +67,14 @@ public abstract class MessageRecordImpl<T, M extends
MessageId> implements Messa
public void fail() {
// no-op
}
+
+ @Override
+ public Map<String, String> getProperties() {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public Optional<EncryptionContext> getEncryptionCtx() {
+ return Optional.empty();
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
index 236c2b9..f7f1f9b 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
@@ -20,9 +20,11 @@
package org.apache.pulsar.client.impl;
import java.util.Map;
+import java.util.Optional;
+
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.common.api.EncryptionContext;
public class TopicMessageImpl<T> extends MessageRecordImpl<T,
TopicMessageIdImpl> {
@@ -107,4 +109,9 @@ public class TopicMessageImpl<T> extends
MessageRecordImpl<T, TopicMessageIdImpl
public T getValue() {
return msg.getValue();
}
+
+ @Override
+ public Optional<EncryptionContext> getEncryptionCtx() {
+ return (msg instanceof MessageImpl) ? ((MessageImpl)
msg).getEncryptionCtx() : Optional.empty();
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/EncryptionContext.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/EncryptionContext.java
similarity index 97%
rename from
pulsar-client/src/main/java/org/apache/pulsar/client/impl/EncryptionContext.java
rename to
pulsar-common/src/main/java/org/apache/pulsar/common/api/EncryptionContext.java
index ba7018e..98eaad7 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/EncryptionContext.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/EncryptionContext.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.client.impl;
+package org.apache.pulsar.common.api;
import java.util.Map;
import java.util.Optional;
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
index 78a7c86..113cf82 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
@@ -25,8 +25,10 @@ import lombok.Getter;
import lombok.ToString;
import java.util.Map;
+import java.util.Optional;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.io.core.Record;
@Data
@@ -42,6 +44,7 @@ public class PulsarRecord<T> implements Record<T> {
private MessageId messageId;
private String topicName;
private Map<String, String> properties;
+ private Optional<EncryptionContext> encryptionCtx = Optional.empty();
private Runnable failFunction;
private Runnable ackFunction;
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index 2190be1..e5100c8 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -25,6 +25,7 @@ import net.jodah.typetools.TypeResolver;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
@@ -133,6 +134,7 @@ public class PulsarSource<T> implements Source<T> {
.recordSequence(Utils.getSequenceId(message.getMessageId()))
.topicName(topicName)
.properties(message.getProperties())
+ .encryptionCtx(message.getEncryptionCtx())
.ackFunction(() -> {
if (pulsarSourceConfig.getProcessingGuarantees() ==
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
inputConsumer.acknowledgeCumulativeAsync(message);
diff --git a/pulsar-io/core/pom.xml b/pulsar-io/core/pom.xml
index 1f31934..30da3d5 100644
--- a/pulsar-io/core/pom.xml
+++ b/pulsar-io/core/pom.xml
@@ -35,6 +35,18 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-common</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
</project>
diff --git
a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/RecordContext.java
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/RecordContext.java
index 09ce8d1..5e12125 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/RecordContext.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/RecordContext.java
@@ -18,6 +18,12 @@
*/
package org.apache.pulsar.io.core;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.pulsar.common.api.EncryptionContext;
+
/**
* A source context that can be used by the runtime to interact with source.
*/
@@ -34,6 +40,20 @@ public interface RecordContext {
* @return Sequence Id associated with the record
*/
default long getRecordSequence() { return -1L; }
+
+ /**
+ * Retrieves user-properties attached to record.
+ *
+ * @return Map of user-properties
+ */
+ default Map<String, String> getProperties() { return
Collections.emptyMap();}
+
+ /**
+ * Retrieves encryption-context that is attached to record.
+ *
+ * @return {@link Optional}<{@link EncryptionContext}>
+ */
+ default Optional<EncryptionContext> getEncryptionCtx() { return
Optional.empty();}
/**
* Acknowledge that this record is fully processed