This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e4579d  Pass encryption-context to pulsar-sink if source receive 
encrypted message (#2068)
5e4579d is described below

commit 5e4579d2fbfa70ac0668a298c269e31d3b54ab13
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Mon Jul 2 17:51:33 2018 -0700

    Pass encryption-context to pulsar-sink if source receive encrypted message 
(#2068)
    
    * Pass encryption-context to pulsar-sink if source receive encrypted message
    
    * move Encryption-ctx to api-pkg
---
 .../client/api/SimpleProducerConsumerTest.java       |  4 ++--
 .../client/api/ConsumerCryptoFailureAction.java      |  2 +-
 .../java/org/apache/pulsar/client/api/Message.java   | 12 ++++++++++++
 .../org/apache/pulsar/client/impl/ConsumerImpl.java  |  4 ++--
 .../org/apache/pulsar/client/impl/MessageImpl.java   |  2 ++
 .../apache/pulsar/client/impl/MessageRecordImpl.java | 15 +++++++++++++++
 .../apache/pulsar/client/impl/TopicMessageImpl.java  |  9 ++++++++-
 .../apache/pulsar/common/api}/EncryptionContext.java |  2 +-
 .../apache/pulsar/functions/source/PulsarRecord.java |  3 +++
 .../apache/pulsar/functions/source/PulsarSource.java |  2 ++
 pulsar-io/core/pom.xml                               | 12 ++++++++++++
 .../org/apache/pulsar/io/core/RecordContext.java     | 20 ++++++++++++++++++++
 12 files changed, 80 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 7fb027d..50d9687 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -55,8 +55,8 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import 
org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException;
 import org.apache.pulsar.client.impl.ConsumerImpl;
-import org.apache.pulsar.client.impl.EncryptionContext;
-import org.apache.pulsar.client.impl.EncryptionContext.EncryptionKey;
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
 import org.apache.pulsar.client.impl.MessageCrypto;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerCryptoFailureAction.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerCryptoFailureAction.java
index e71b798..3ab48f7 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerCryptoFailureAction.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerCryptoFailureAction.java
@@ -19,7 +19,7 @@
 
 package org.apache.pulsar.client.api;
 
-import org.apache.pulsar.client.impl.EncryptionContext;
+import org.apache.pulsar.common.api.EncryptionContext;
 
 public enum ConsumerCryptoFailureAction {
     FAIL, // This is the default option to fail consume until crypto succeeds
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Message.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Message.java
index 33be458..d614962 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Message.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Message.java
@@ -20,6 +20,10 @@
 package org.apache.pulsar.client.api;
 
 import java.util.Map;
+import java.util.Optional;
+
+import org.apache.pulsar.common.api.EncryptionContext;
+
 
 /**
  * The message abstraction used in Pulsar.
@@ -127,4 +131,12 @@ public interface Message<T> {
      * @return the key of the message
      */
     String getKey();
+    
+    /**
+     * {@link EncryptionContext} contains encryption and compression 
information in it using which application can
+     * decrypt consumed message with encrypted-payload.
+     * 
+     * @return
+     */
+    Optional<EncryptionContext> getEncryptionCtx();
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index e7924cb..6f78119 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -63,9 +63,10 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.impl.EncryptionContext.EncryptionKey;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.common.api.Commands;
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
 import org.apache.pulsar.common.api.PulsarDecoder;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
@@ -83,7 +84,6 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
 
 public class ConsumerImpl<T> extends ConsumerBase<T> implements 
ConnectionHandler.Connection {
     private static final int MAX_REDELIVER_UNACKNOWLEDGED = 1000;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 4adada5..504cc61 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.common.api.Commands;
+import org.apache.pulsar.common.api.EncryptionContext;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
@@ -330,6 +331,7 @@ public class MessageImpl<T> extends MessageRecordImpl<T, 
MessageId> {
         this.messageId = messageId;
     }
     
+    @Override
     public Optional<EncryptionContext> getEncryptionCtx() {
         return encryptionCtx;
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRecordImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRecordImpl.java
index a27ac13..ea1d892 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRecordImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRecordImpl.java
@@ -18,8 +18,13 @@
  */
 package org.apache.pulsar.client.impl;
 
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.common.api.EncryptionContext;
 import org.apache.pulsar.io.core.Record;
 
 /**
@@ -62,4 +67,14 @@ public abstract class MessageRecordImpl<T, M extends 
MessageId> implements Messa
     public void fail() {
         // no-op
     }
+
+    @Override
+    public Map<String, String> getProperties() {
+        return Collections.emptyMap();
+    }
+
+    @Override
+    public Optional<EncryptionContext> getEncryptionCtx() {
+        return Optional.empty();
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
index 236c2b9..f7f1f9b 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
@@ -20,9 +20,11 @@
 package org.apache.pulsar.client.impl;
 
 import java.util.Map;
+import java.util.Optional;
+
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.common.api.EncryptionContext;
 
 public class TopicMessageImpl<T> extends MessageRecordImpl<T, 
TopicMessageIdImpl> {
 
@@ -107,4 +109,9 @@ public class TopicMessageImpl<T> extends 
MessageRecordImpl<T, TopicMessageIdImpl
     public T getValue() {
         return msg.getValue();
     }
+    
+    @Override
+    public Optional<EncryptionContext> getEncryptionCtx() {
+        return (msg instanceof MessageImpl) ? ((MessageImpl) 
msg).getEncryptionCtx() : Optional.empty();
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/EncryptionContext.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/EncryptionContext.java
similarity index 97%
rename from 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/EncryptionContext.java
rename to 
pulsar-common/src/main/java/org/apache/pulsar/common/api/EncryptionContext.java
index ba7018e..98eaad7 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/EncryptionContext.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/EncryptionContext.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.client.impl;
+package org.apache.pulsar.common.api;
 
 import java.util.Map;
 import java.util.Optional;
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
index 78a7c86..113cf82 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
@@ -25,8 +25,10 @@ import lombok.Getter;
 import lombok.ToString;
 
 import java.util.Map;
+import java.util.Optional;
 
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.common.api.EncryptionContext;
 import org.apache.pulsar.io.core.Record;
 
 @Data
@@ -42,6 +44,7 @@ public class PulsarRecord<T> implements Record<T> {
     private MessageId messageId;
     private String topicName;
     private Map<String, String> properties;
+    private Optional<EncryptionContext> encryptionCtx = Optional.empty();
     private Runnable failFunction;
     private Runnable ackFunction;
 
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index 2190be1..e5100c8 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -25,6 +25,7 @@ import net.jodah.typetools.TypeResolver;
 
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.TopicMessageIdImpl;
@@ -133,6 +134,7 @@ public class PulsarSource<T> implements Source<T> {
                 .recordSequence(Utils.getSequenceId(message.getMessageId()))
                 .topicName(topicName)
                 .properties(message.getProperties())
+                .encryptionCtx(message.getEncryptionCtx())
                 .ackFunction(() -> {
                     if (pulsarSourceConfig.getProcessingGuarantees() == 
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
                         inputConsumer.acknowledgeCumulativeAsync(message);
diff --git a/pulsar-io/core/pom.xml b/pulsar-io/core/pom.xml
index 1f31934..30da3d5 100644
--- a/pulsar-io/core/pom.xml
+++ b/pulsar-io/core/pom.xml
@@ -35,6 +35,18 @@
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
     </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-common</artifactId>
+      <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>*</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
   </dependencies>
 
 </project>
diff --git 
a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/RecordContext.java 
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/RecordContext.java
index 09ce8d1..5e12125 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/RecordContext.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/RecordContext.java
@@ -18,6 +18,12 @@
  */
 package org.apache.pulsar.io.core;
 
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.pulsar.common.api.EncryptionContext;
+
 /**
  * A source context that can be used by the runtime to interact with source.
  */
@@ -34,6 +40,20 @@ public interface RecordContext {
      * @return Sequence Id associated with the record
      */
     default long getRecordSequence() { return -1L; }
+    
+    /**
+     * Retrieves user-properties attached to record. 
+     * 
+     * @return Map of user-properties
+     */
+    default Map<String, String> getProperties() { return 
Collections.emptyMap();}
+    
+    /**
+     * Retrieves encryption-context that is attached to record. 
+     * 
+     * @return {@link Optional}<{@link EncryptionContext}>
+     */
+    default Optional<EncryptionContext> getEncryptionCtx() { return 
Optional.empty();}
 
     /**
      * Acknowledge that this record is fully processed

Reply via email to