rdhabalia closed pull request #2108: add flatbuffer option to serialize 
kinesis-message in KinesisSink
URL: https://github.com/apache/incubator-pulsar/pull/2108
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/distribution/server/src/assemble/LICENSE.bin.txt 
b/distribution/server/src/assemble/LICENSE.bin.txt
index 763f62c7a5..33b97481da 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -459,6 +459,8 @@ The Apache Software License, Version 2.0
     - org.inferred-freebuilder-1.14.9.jar
   * Snappy Java
     - org.xerial.snappy-snappy-java-1.1.1.3.jar
+  * Flatbuffers Java
+    - com.google.flatbuffers-flatbuffers-java-1.9.0.jar
 
 
 BSD 3-clause "New" or "Revised" License
diff --git a/pom.xml b/pom.xml
index 862a29d020..690022d77b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -952,6 +952,11 @@ flexible messaging model and an intuitive client 
API.</description>
             
<exclude>src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java</exclude>
             
<exclude>src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java</exclude>
             
<exclude>src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java</exclude>
+            
<exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/CompressionType.java</exclude>
+            
<exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionCtx.java</exclude>
+            
<exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionKey.java</exclude>
+            
<exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/KeyValue.java</exclude>
+            
<exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/Message.java</exclude>
             <exclude>**/ByteBufCodedInputStream.java</exclude>
             <exclude>**/ByteBufCodedOutputStream.java</exclude>
             <exclude>bin/proto/*</exclude>
@@ -1056,6 +1061,13 @@ flexible messaging model and an intuitive client 
API.</description>
             
<exclude>src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java</exclude>
             
<exclude>src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java</exclude>
             <exclude>bin/proto/MLDataFormats_pb2.py</exclude>
+            
+            <!-- pulasr-io-connector kinesis : auto generated files from 
flatbuffer schema -->
+            
<exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/CompressionType.java</exclude>
+            
<exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionCtx.java</exclude>
+            
<exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionKey.java</exclude>
+            
<exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/KeyValue.java</exclude>
+            
<exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/Message.java</exclude>
 
             <!-- These files are BSD licensed files -->
             
<exclude>src/main/java/org/apache/pulsar/common/util/protobuf/ByteBufCodedInputStream.java</exclude>
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/EncryptionContext.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/EncryptionContext.java
index 98eaad7835..ff359c5b0f 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/EncryptionContext.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/EncryptionContext.java
@@ -34,7 +34,6 @@
 
     private Map<String, EncryptionKey> keys;
     private byte[] param;
-    private Map<String, String> metadata;
     private String algorithm;
     private CompressionType compressionType;
     private int uncompressedMessageSize;
diff --git a/pulsar-io/kinesis/pom.xml b/pulsar-io/kinesis/pom.xml
index 08c30049ab..f0c2776fab 100644
--- a/pulsar-io/kinesis/pom.xml
+++ b/pulsar-io/kinesis/pom.xml
@@ -81,6 +81,12 @@
       <version>0.12.8</version>
     </dependency>
        <!-- /kinesis dependencies -->
+       
+    <dependency>
+      <groupId>com.google.flatbuffers</groupId>
+      <artifactId>flatbuffers-java</artifactId>
+      <version>1.9.0</version>
+    </dependency>
 
   </dependencies>
 
diff --git a/pulsar-io/kinesis/src/main/fb/KinesisMessageApi.fbs 
b/pulsar-io/kinesis/src/main/fb/KinesisMessageApi.fbs
new file mode 100644
index 0000000000..f7cc0304ae
--- /dev/null
+++ b/pulsar-io/kinesis/src/main/fb/KinesisMessageApi.fbs
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ 
+ /**
+ * Instrunction to generate fb-source
+ * export PULSAR_HOME=<Path where you cloned the pulsar repo>
+ * export KINESIS_IO_MAIN=${PULSAR_HOME}/pulsar-io/kinesis/src/main
+ * Command to build java src:  flatc --java  -o ${KINESIS_IO_MAIN}/java 
${KINESIS_IO_MAIN}/fb/KinesisMessageApi.fbs 
+ * flatc version 1.9.0 (pip install flatbuffers)
+ */
+
+namespace org.apache.pulsar.io.kinesis.fbs;
+
+table KeyValue {
+  key:string;
+  value:string;
+}
+
+table EncryptionKey {
+  key:string;
+  value:[byte];
+  metadata:[KeyValue];
+}
+
+enum CompressionType : byte { NONE, LZ4, ZLIB }
+
+table EncryptionCtx {
+  keys:[EncryptionKey];
+  param:[byte];
+  algo:string;
+  compressionType:CompressionType;
+  uncompressedMessageSize:int;
+  batchSize:int;
+  isBatchMessage:bool=false;
+}
+
+table Message {
+  encryptionCtx:EncryptionCtx;
+  properties:[KeyValue];
+  payload:[byte];
+}
+
+root_type Message;
\ No newline at end of file
diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index eaa2b91845..f6729abdf8 100644
--- 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++ 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -102,7 +102,7 @@ public void write(RecordContext inputRecordContext, byte[] 
value) throws Excepti
                 : partitionedKey; // partitionedKey Length must be at least 
one, and at most 256
         ListenableFuture<UserRecordResult> addRecordResult = 
kinesisProducer.addUserRecord(this.streamName,
                 partitionedKey,
-                
ByteBuffer.wrap(createKinesisMessage(kinesisSinkConfig.getMessageFormat(), 
inputRecordContext, value)));
+                createKinesisMessage(kinesisSinkConfig.getMessageFormat(), 
inputRecordContext, value));
         addCallback(addRecordResult,
                 ProducerSendCallback.create(this.streamName, 
inputRecordContext, System.nanoTime()), directExecutor());
         if (LOG.isDebugEnabled()) {
@@ -273,12 +273,14 @@ public void refresh() {
         };
     }
 
-    public static byte[] createKinesisMessage(MessageFormat msgFormat, 
RecordContext recordCtx, byte[] data) {
+    public static ByteBuffer createKinesisMessage(MessageFormat msgFormat, 
RecordContext recordCtx, byte[] data) {
         if (MessageFormat.FULL_MESSAGE_IN_JSON.equals(msgFormat)) {
-            return Utils.serializeRecordToJson(recordCtx, data).getBytes();
+            return ByteBuffer.wrap(Utils.serializeRecordToJson(recordCtx, 
data).getBytes());
+        } else if (MessageFormat.FULL_MESSAGE_IN_FB.equals(msgFormat)) {
+            return Utils.serializeRecordToFlatBuffer(recordCtx, data);
         } else {
             // send raw-message
-            return data;
+            return ByteBuffer.wrap(data);
         }
     }
     
diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
index e3c8fde263..bf5f2ea883 100644
--- 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
+++ 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
@@ -76,6 +76,10 @@ public static KinesisSinkConfig load(Map<String, Object> 
map) throws IOException
          * 
          * 
          */
-        FULL_MESSAGE_IN_JSON;
+        FULL_MESSAGE_IN_JSON,
+        /**
+         * Kinesis sink sends message serialized in flat-buffer.
+         */
+        FULL_MESSAGE_IN_FB;
     }
 }
\ No newline at end of file
diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java
index 82360801db..469151eeb9 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java
+++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java
@@ -19,13 +19,22 @@
 
 package org.apache.pulsar.io.kinesis;
 
+import static com.google.common.base.Preconditions.checkNotNull;
 import static java.util.Base64.getEncoder;
 
+import java.nio.ByteBuffer;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
 
 import org.apache.pulsar.common.api.EncryptionContext;
 import org.apache.pulsar.io.core.RecordContext;
+import org.apache.pulsar.io.kinesis.fbs.EncryptionCtx;
+import org.apache.pulsar.io.kinesis.fbs.EncryptionKey;
+import org.apache.pulsar.io.kinesis.fbs.KeyValue;
+import org.apache.pulsar.io.kinesis.fbs.Message;
 
+import com.google.flatbuffers.FlatBufferBuilder;
 import com.google.gson.JsonObject;
 
 public class Utils {
@@ -34,13 +43,121 @@
     private static final String PROPERTIES_FIELD = "properties";
     private static final String KEY_MAP_FIELD = "keysMapBase64";
     private static final String KEY_METADATA_MAP_FIELD = "keysMetadataMap";
-    private static final String METADATA_FIELD = "metadata";
     private static final String ENCRYPTION_PARAM_FIELD = "encParamBase64";
     private static final String ALGO_FIELD = "algorithm";
     private static final String COMPRESSION_TYPE_FIELD = "compressionType";
     private static final String UNCPRESSED_MSG_SIZE_FIELD = 
"uncompressedMessageSize";
     private static final String BATCH_SIZE_FIELD = "batchSize";
     private static final String ENCRYPTION_CTX_FIELD = "encryptionCtx";
+    
+    private static final FlatBufferBuilder DEFAULT_FB_BUILDER = new 
FlatBufferBuilder(0);
+
+    /**
+     * Serialize record to flat-buffer. it's not a thread-safe method.
+     * 
+     * @param inputRecordContext
+     * @param data
+     * @return
+     */
+    public static ByteBuffer serializeRecordToFlatBuffer(RecordContext 
inputRecordContext, byte[] data) {
+        DEFAULT_FB_BUILDER.clear();
+        return serializeRecordToFlatBuffer(DEFAULT_FB_BUILDER, 
inputRecordContext, data);
+    }
+    
+    public static ByteBuffer serializeRecordToFlatBuffer(FlatBufferBuilder 
builder, RecordContext inputRecordContext, byte[] data) {
+        checkNotNull(inputRecordContext, "record-context can't be null");
+        Optional<EncryptionContext> encryptionCtx = 
inputRecordContext.getEncryptionCtx();
+        Map<String, String> properties = inputRecordContext.getProperties();
+
+        int encryptionCtxOffset = -1;
+        int propertiesOffset = -1;
+
+        if (properties != null && !properties.isEmpty()) {
+            int[] propertiesOffsetArray = new int[properties.size()];
+            int i = 0;
+            for (Entry<String, String> property : properties.entrySet()) {
+                propertiesOffsetArray[i++] = KeyValue.createKeyValue(builder, 
builder.createString(property.getKey()),
+                        builder.createString(property.getValue()));
+            }
+            propertiesOffset = Message.createPropertiesVector(builder, 
propertiesOffsetArray);
+        }
+
+        if (encryptionCtx.isPresent()) {
+            encryptionCtxOffset = createEncryptionCtxOffset(builder, 
encryptionCtx);
+        }
+
+        int payloadOffset = Message.createPayloadVector(builder, data);
+        Message.startMessage(builder);
+        Message.addPayload(builder, payloadOffset);
+        if (encryptionCtxOffset != -1) {
+            Message.addEncryptionCtx(builder, encryptionCtxOffset);
+        }
+        if (propertiesOffset != -1) {
+            Message.addProperties(builder, propertiesOffset);
+        }
+        int endMessage = Message.endMessage(builder);
+        builder.finish(endMessage);
+        ByteBuffer bb = builder.dataBuffer();
+        
+        // to avoid copying of data, use same byte[] wrapped by ByteBuffer. 
But, ByteBuffer.array() returns entire array
+        // so, it requires to read from offset:
+        // builder.sizedByteArray()=>copies buffer: sizedByteArray(space, 
bb.capacity() - space)
+        int space = bb.capacity() - builder.offset(); 
+        return ByteBuffer.wrap(bb.array(), space, bb.capacity() - space);
+    }
+
+    private static int createEncryptionCtxOffset(final FlatBufferBuilder 
builder, Optional<EncryptionContext> encryptionCtx) {
+        if (!encryptionCtx.isPresent()) {
+            return -1;
+        }
+        // Message.addEncryptionCtx(builder, encryptionCtxOffset);
+        EncryptionContext ctx = encryptionCtx.get();
+        int[] keysOffsets = new int[ctx.getKeys().size()];
+        int keyIndex = 0;
+        for (Entry<String, 
org.apache.pulsar.common.api.EncryptionContext.EncryptionKey> entry : 
ctx.getKeys()
+                .entrySet()) {
+            int key = builder.createString(entry.getKey());
+            int value = EncryptionKey.createValueVector(builder, 
entry.getValue().getKeyValue());
+            Map<String, String> metadata = entry.getValue().getMetadata();
+            int[] metadataOffsets = new int[metadata.size()];
+            int i = 0;
+            for (Entry<String, String> m : metadata.entrySet()) {
+                metadataOffsets[i++] = KeyValue.createKeyValue(builder, 
builder.createString(m.getKey()),
+                        builder.createString(m.getValue()));
+            }
+            int metadataOffset = -1;
+            if (metadata.size() > 0) {
+                metadataOffset = EncryptionKey.createMetadataVector(builder, 
metadataOffsets);
+            }
+            EncryptionKey.startEncryptionKey(builder);
+            EncryptionKey.addKey(builder, key);
+            EncryptionKey.addValue(builder, value);
+            if(metadataOffset!=-1) {
+                EncryptionKey.addMetadata(builder, metadataOffset);            
    
+            }
+            keysOffsets[keyIndex++] = EncryptionKey.endEncryptionKey(builder);
+        }
+
+        int keysOffset = EncryptionCtx.createKeysVector(builder, keysOffsets);
+        int param = EncryptionCtx.createParamVector(builder, ctx.getParam());
+        int algo = builder.createString(ctx.getAlgorithm());
+        int batchSize = ctx.getBatchSize().isPresent() ? 
ctx.getBatchSize().get() : 1;
+        byte compressionType;
+        switch (ctx.getCompressionType()) {
+        case LZ4:
+            compressionType = 
org.apache.pulsar.io.kinesis.fbs.CompressionType.LZ4;
+            break;
+        case ZLIB:
+            compressionType = 
org.apache.pulsar.io.kinesis.fbs.CompressionType.ZLIB;
+            break;
+        default:
+            compressionType = 
org.apache.pulsar.io.kinesis.fbs.CompressionType.NONE;
+
+        }
+        return EncryptionCtx.createEncryptionCtx(builder, keysOffset, param, 
algo, compressionType,
+                ctx.getUncompressedMessageSize(), batchSize, 
ctx.getBatchSize().isPresent());
+    
+    }
 
     /**
      * Serializes sink-record into json format. It encodes encryption-keys, 
encryption-param and payload in base64
@@ -51,9 +168,7 @@
      * @return
      */
     public static String serializeRecordToJson(RecordContext 
inputRecordContext, byte[] data) {
-        if (inputRecordContext == null) {
-            return null;
-        }
+        checkNotNull(inputRecordContext, "record-context can't be null");
         JsonObject result = new JsonObject();
         result.addProperty(PAYLOAD_FIELD, getEncoder().encodeToString(data));
         if (inputRecordContext.getProperties() != null) {
@@ -79,12 +194,6 @@ public static String serializeRecordToJson(RecordContext 
inputRecordContext, byt
             });
             encryptionCtxJson.add(KEY_MAP_FIELD, keyBase64Map);
             encryptionCtxJson.add(KEY_METADATA_MAP_FIELD, keyMetadataMap);
-            Map<String, String> metadataMap = encryptionCtx.getMetadata();
-            if (metadataMap != null && !metadataMap.isEmpty()) {
-                JsonObject metadata = new JsonObject();
-                encryptionCtx.getMetadata().entrySet().forEach(m -> 
metadata.addProperty(m.getKey(), m.getValue()));
-                encryptionCtxJson.add(METADATA_FIELD, metadata);
-            }
             encryptionCtxJson.addProperty(ENCRYPTION_PARAM_FIELD,
                     getEncoder().encodeToString(encryptionCtx.getParam()));
             encryptionCtxJson.addProperty(ALGO_FIELD, 
encryptionCtx.getAlgorithm());
diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/CompressionType.java
 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/CompressionType.java
new file mode 100644
index 0000000000..0a90a224d8
--- /dev/null
+++ 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/CompressionType.java
@@ -0,0 +1,15 @@
+// automatically generated by the FlatBuffers compiler, do not modify
+
+package org.apache.pulsar.io.kinesis.fbs;
+
+public final class CompressionType {
+  private CompressionType() { }
+  public static final byte NONE = 0;
+  public static final byte LZ4 = 1;
+  public static final byte ZLIB = 2;
+
+  public static final String[] names = { "NONE", "LZ4", "ZLIB", };
+
+  public static String name(int e) { return names[e]; }
+}
+
diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionCtx.java
 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionCtx.java
new file mode 100644
index 0000000000..e6dff7282e
--- /dev/null
+++ 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionCtx.java
@@ -0,0 +1,68 @@
+// automatically generated by the FlatBuffers compiler, do not modify
+
+package org.apache.pulsar.io.kinesis.fbs;
+
+import java.nio.*;
+import java.lang.*;
+import java.util.*;
+import com.google.flatbuffers.*;
+
+@SuppressWarnings("unused")
+public final class EncryptionCtx extends Table {
+  public static EncryptionCtx getRootAsEncryptionCtx(ByteBuffer _bb) { return 
getRootAsEncryptionCtx(_bb, new EncryptionCtx()); }
+  public static EncryptionCtx getRootAsEncryptionCtx(ByteBuffer _bb, 
EncryptionCtx obj) { _bb.order(ByteOrder.LITTLE_ENDIAN); return 
(obj.__assign(_bb.getInt(_bb.position()) + _bb.position(), _bb)); }
+  public void __init(int _i, ByteBuffer _bb) { bb_pos = _i; bb = _bb; }
+  public EncryptionCtx __assign(int _i, ByteBuffer _bb) { __init(_i, _bb); 
return this; }
+
+  public EncryptionKey keys(int j) { return keys(new EncryptionKey(), j); }
+  public EncryptionKey keys(EncryptionKey obj, int j) { int o = __offset(4); 
return o != 0 ? obj.__assign(__indirect(__vector(o) + j * 4), bb) : null; }
+  public int keysLength() { int o = __offset(4); return o != 0 ? 
__vector_len(o) : 0; }
+  public byte param(int j) { int o = __offset(6); return o != 0 ? 
bb.get(__vector(o) + j * 1) : 0; }
+  public int paramLength() { int o = __offset(6); return o != 0 ? 
__vector_len(o) : 0; }
+  public ByteBuffer paramAsByteBuffer() { return __vector_as_bytebuffer(6, 1); 
}
+  public ByteBuffer paramInByteBuffer(ByteBuffer _bb) { return 
__vector_in_bytebuffer(_bb, 6, 1); }
+  public String algo() { int o = __offset(8); return o != 0 ? __string(o + 
bb_pos) : null; }
+  public ByteBuffer algoAsByteBuffer() { return __vector_as_bytebuffer(8, 1); }
+  public ByteBuffer algoInByteBuffer(ByteBuffer _bb) { return 
__vector_in_bytebuffer(_bb, 8, 1); }
+  public byte compressionType() { int o = __offset(10); return o != 0 ? 
bb.get(o + bb_pos) : 0; }
+  public int uncompressedMessageSize() { int o = __offset(12); return o != 0 ? 
bb.getInt(o + bb_pos) : 0; }
+  public int batchSize() { int o = __offset(14); return o != 0 ? bb.getInt(o + 
bb_pos) : 0; }
+  public boolean isBatchMessage() { int o = __offset(16); return o != 0 ? 
0!=bb.get(o + bb_pos) : false; }
+
+  public static int createEncryptionCtx(FlatBufferBuilder builder,
+      int keysOffset,
+      int paramOffset,
+      int algoOffset,
+      byte compressionType,
+      int uncompressedMessageSize,
+      int batchSize,
+      boolean isBatchMessage) {
+    builder.startObject(7);
+    EncryptionCtx.addBatchSize(builder, batchSize);
+    EncryptionCtx.addUncompressedMessageSize(builder, uncompressedMessageSize);
+    EncryptionCtx.addAlgo(builder, algoOffset);
+    EncryptionCtx.addParam(builder, paramOffset);
+    EncryptionCtx.addKeys(builder, keysOffset);
+    EncryptionCtx.addIsBatchMessage(builder, isBatchMessage);
+    EncryptionCtx.addCompressionType(builder, compressionType);
+    return EncryptionCtx.endEncryptionCtx(builder);
+  }
+
+  public static void startEncryptionCtx(FlatBufferBuilder builder) { 
builder.startObject(7); }
+  public static void addKeys(FlatBufferBuilder builder, int keysOffset) { 
builder.addOffset(0, keysOffset, 0); }
+  public static int createKeysVector(FlatBufferBuilder builder, int[] data) { 
builder.startVector(4, data.length, 4); for (int i = data.length - 1; i >= 0; 
i--) builder.addOffset(data[i]); return builder.endVector(); }
+  public static void startKeysVector(FlatBufferBuilder builder, int numElems) 
{ builder.startVector(4, numElems, 4); }
+  public static void addParam(FlatBufferBuilder builder, int paramOffset) { 
builder.addOffset(1, paramOffset, 0); }
+  public static int createParamVector(FlatBufferBuilder builder, byte[] data) 
{ builder.startVector(1, data.length, 1); for (int i = data.length - 1; i >= 0; 
i--) builder.addByte(data[i]); return builder.endVector(); }
+  public static void startParamVector(FlatBufferBuilder builder, int numElems) 
{ builder.startVector(1, numElems, 1); }
+  public static void addAlgo(FlatBufferBuilder builder, int algoOffset) { 
builder.addOffset(2, algoOffset, 0); }
+  public static void addCompressionType(FlatBufferBuilder builder, byte 
compressionType) { builder.addByte(3, compressionType, 0); }
+  public static void addUncompressedMessageSize(FlatBufferBuilder builder, int 
uncompressedMessageSize) { builder.addInt(4, uncompressedMessageSize, 0); }
+  public static void addBatchSize(FlatBufferBuilder builder, int batchSize) { 
builder.addInt(5, batchSize, 0); }
+  public static void addIsBatchMessage(FlatBufferBuilder builder, boolean 
isBatchMessage) { builder.addBoolean(6, isBatchMessage, false); }
+  public static int endEncryptionCtx(FlatBufferBuilder builder) {
+    int o = builder.endObject();
+    return o;
+  }
+}
+
diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionKey.java
 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionKey.java
new file mode 100644
index 0000000000..44d74f49a5
--- /dev/null
+++ 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionKey.java
@@ -0,0 +1,52 @@
+// automatically generated by the FlatBuffers compiler, do not modify
+
+package org.apache.pulsar.io.kinesis.fbs;
+
+import java.nio.*;
+import java.lang.*;
+import java.util.*;
+import com.google.flatbuffers.*;
+
+@SuppressWarnings("unused")
+public final class EncryptionKey extends Table {
+  public static EncryptionKey getRootAsEncryptionKey(ByteBuffer _bb) { return 
getRootAsEncryptionKey(_bb, new EncryptionKey()); }
+  public static EncryptionKey getRootAsEncryptionKey(ByteBuffer _bb, 
EncryptionKey obj) { _bb.order(ByteOrder.LITTLE_ENDIAN); return 
(obj.__assign(_bb.getInt(_bb.position()) + _bb.position(), _bb)); }
+  public void __init(int _i, ByteBuffer _bb) { bb_pos = _i; bb = _bb; }
+  public EncryptionKey __assign(int _i, ByteBuffer _bb) { __init(_i, _bb); 
return this; }
+
+  public String key() { int o = __offset(4); return o != 0 ? __string(o + 
bb_pos) : null; }
+  public ByteBuffer keyAsByteBuffer() { return __vector_as_bytebuffer(4, 1); }
+  public ByteBuffer keyInByteBuffer(ByteBuffer _bb) { return 
__vector_in_bytebuffer(_bb, 4, 1); }
+  public byte value(int j) { int o = __offset(6); return o != 0 ? 
bb.get(__vector(o) + j * 1) : 0; }
+  public int valueLength() { int o = __offset(6); return o != 0 ? 
__vector_len(o) : 0; }
+  public ByteBuffer valueAsByteBuffer() { return __vector_as_bytebuffer(6, 1); 
}
+  public ByteBuffer valueInByteBuffer(ByteBuffer _bb) { return 
__vector_in_bytebuffer(_bb, 6, 1); }
+  public KeyValue metadata(int j) { return metadata(new KeyValue(), j); }
+  public KeyValue metadata(KeyValue obj, int j) { int o = __offset(8); return 
o != 0 ? obj.__assign(__indirect(__vector(o) + j * 4), bb) : null; }
+  public int metadataLength() { int o = __offset(8); return o != 0 ? 
__vector_len(o) : 0; }
+
+  public static int createEncryptionKey(FlatBufferBuilder builder,
+      int keyOffset,
+      int valueOffset,
+      int metadataOffset) {
+    builder.startObject(3);
+    EncryptionKey.addMetadata(builder, metadataOffset);
+    EncryptionKey.addValue(builder, valueOffset);
+    EncryptionKey.addKey(builder, keyOffset);
+    return EncryptionKey.endEncryptionKey(builder);
+  }
+
+  public static void startEncryptionKey(FlatBufferBuilder builder) { 
builder.startObject(3); }
+  public static void addKey(FlatBufferBuilder builder, int keyOffset) { 
builder.addOffset(0, keyOffset, 0); }
+  public static void addValue(FlatBufferBuilder builder, int valueOffset) { 
builder.addOffset(1, valueOffset, 0); }
+  public static int createValueVector(FlatBufferBuilder builder, byte[] data) 
{ builder.startVector(1, data.length, 1); for (int i = data.length - 1; i >= 0; 
i--) builder.addByte(data[i]); return builder.endVector(); }
+  public static void startValueVector(FlatBufferBuilder builder, int numElems) 
{ builder.startVector(1, numElems, 1); }
+  public static void addMetadata(FlatBufferBuilder builder, int 
metadataOffset) { builder.addOffset(2, metadataOffset, 0); }
+  public static int createMetadataVector(FlatBufferBuilder builder, int[] 
data) { builder.startVector(4, data.length, 4); for (int i = data.length - 1; i 
>= 0; i--) builder.addOffset(data[i]); return builder.endVector(); }
+  public static void startMetadataVector(FlatBufferBuilder builder, int 
numElems) { builder.startVector(4, numElems, 4); }
+  public static int endEncryptionKey(FlatBufferBuilder builder) {
+    int o = builder.endObject();
+    return o;
+  }
+}
+
diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/KeyValue.java
 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/KeyValue.java
new file mode 100644
index 0000000000..8cb53b4865
--- /dev/null
+++ 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/KeyValue.java
@@ -0,0 +1,41 @@
+// automatically generated by the FlatBuffers compiler, do not modify
+
+package org.apache.pulsar.io.kinesis.fbs;
+
+import java.nio.*;
+import java.lang.*;
+import java.util.*;
+import com.google.flatbuffers.*;
+
+@SuppressWarnings("unused")
+public final class KeyValue extends Table {
+  public static KeyValue getRootAsKeyValue(ByteBuffer _bb) { return 
getRootAsKeyValue(_bb, new KeyValue()); }
+  public static KeyValue getRootAsKeyValue(ByteBuffer _bb, KeyValue obj) { 
_bb.order(ByteOrder.LITTLE_ENDIAN); return 
(obj.__assign(_bb.getInt(_bb.position()) + _bb.position(), _bb)); }
+  public void __init(int _i, ByteBuffer _bb) { bb_pos = _i; bb = _bb; }
+  public KeyValue __assign(int _i, ByteBuffer _bb) { __init(_i, _bb); return 
this; }
+
+  public String key() { int o = __offset(4); return o != 0 ? __string(o + 
bb_pos) : null; }
+  public ByteBuffer keyAsByteBuffer() { return __vector_as_bytebuffer(4, 1); }
+  public ByteBuffer keyInByteBuffer(ByteBuffer _bb) { return 
__vector_in_bytebuffer(_bb, 4, 1); }
+  public String value() { int o = __offset(6); return o != 0 ? __string(o + 
bb_pos) : null; }
+  public ByteBuffer valueAsByteBuffer() { return __vector_as_bytebuffer(6, 1); 
}
+  public ByteBuffer valueInByteBuffer(ByteBuffer _bb) { return 
__vector_in_bytebuffer(_bb, 6, 1); }
+
+  public static int createKeyValue(FlatBufferBuilder builder,
+      int keyOffset,
+      int valueOffset) {
+    builder.startObject(2);
+    KeyValue.addValue(builder, valueOffset);
+    KeyValue.addKey(builder, keyOffset);
+    return KeyValue.endKeyValue(builder);
+  }
+
+  public static void startKeyValue(FlatBufferBuilder builder) { 
builder.startObject(2); }
+  public static void addKey(FlatBufferBuilder builder, int keyOffset) { 
builder.addOffset(0, keyOffset, 0); }
+  public static void addValue(FlatBufferBuilder builder, int valueOffset) { 
builder.addOffset(1, valueOffset, 0); }
+  public static int endKeyValue(FlatBufferBuilder builder) {
+    int o = builder.endObject();
+    return o;
+  }
+}
+
diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/Message.java 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/Message.java
new file mode 100644
index 0000000000..d29171c4b0
--- /dev/null
+++ 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/Message.java
@@ -0,0 +1,53 @@
+// automatically generated by the FlatBuffers compiler, do not modify
+
+package org.apache.pulsar.io.kinesis.fbs;
+
+import java.nio.*;
+import java.lang.*;
+import java.util.*;
+import com.google.flatbuffers.*;
+
+@SuppressWarnings("unused")
+public final class Message extends Table {
+  public static Message getRootAsMessage(ByteBuffer _bb) { return 
getRootAsMessage(_bb, new Message()); }
+  public static Message getRootAsMessage(ByteBuffer _bb, Message obj) { 
_bb.order(ByteOrder.LITTLE_ENDIAN); return 
(obj.__assign(_bb.getInt(_bb.position()) + _bb.position(), _bb)); }
+  public void __init(int _i, ByteBuffer _bb) { bb_pos = _i; bb = _bb; }
+  public Message __assign(int _i, ByteBuffer _bb) { __init(_i, _bb); return 
this; }
+
+  public EncryptionCtx encryptionCtx() { return encryptionCtx(new 
EncryptionCtx()); }
+  public EncryptionCtx encryptionCtx(EncryptionCtx obj) { int o = __offset(4); 
return o != 0 ? obj.__assign(__indirect(o + bb_pos), bb) : null; }
+  public KeyValue properties(int j) { return properties(new KeyValue(), j); }
+  public KeyValue properties(KeyValue obj, int j) { int o = __offset(6); 
return o != 0 ? obj.__assign(__indirect(__vector(o) + j * 4), bb) : null; }
+  public int propertiesLength() { int o = __offset(6); return o != 0 ? 
__vector_len(o) : 0; }
+  public byte payload(int j) { int o = __offset(8); return o != 0 ? 
bb.get(__vector(o) + j * 1) : 0; }
+  public int payloadLength() { int o = __offset(8); return o != 0 ? 
__vector_len(o) : 0; }
+  public ByteBuffer payloadAsByteBuffer() { return __vector_as_bytebuffer(8, 
1); }
+  public ByteBuffer payloadInByteBuffer(ByteBuffer _bb) { return 
__vector_in_bytebuffer(_bb, 8, 1); }
+
+  public static int createMessage(FlatBufferBuilder builder,
+      int encryptionCtxOffset,
+      int propertiesOffset,
+      int payloadOffset) {
+    builder.startObject(3);
+    Message.addPayload(builder, payloadOffset);
+    Message.addProperties(builder, propertiesOffset);
+    Message.addEncryptionCtx(builder, encryptionCtxOffset);
+    return Message.endMessage(builder);
+  }
+
+  public static void startMessage(FlatBufferBuilder builder) { 
builder.startObject(3); }
+  public static void addEncryptionCtx(FlatBufferBuilder builder, int 
encryptionCtxOffset) { builder.addOffset(0, encryptionCtxOffset, 0); }
+  public static void addProperties(FlatBufferBuilder builder, int 
propertiesOffset) { builder.addOffset(1, propertiesOffset, 0); }
+  public static int createPropertiesVector(FlatBufferBuilder builder, int[] 
data) { builder.startVector(4, data.length, 4); for (int i = data.length - 1; i 
>= 0; i--) builder.addOffset(data[i]); return builder.endVector(); }
+  public static void startPropertiesVector(FlatBufferBuilder builder, int 
numElems) { builder.startVector(4, numElems, 4); }
+  public static void addPayload(FlatBufferBuilder builder, int payloadOffset) 
{ builder.addOffset(2, payloadOffset, 0); }
+  public static int createPayloadVector(FlatBufferBuilder builder, byte[] 
data) { builder.startVector(1, data.length, 1); for (int i = data.length - 1; i 
>= 0; i--) builder.addByte(data[i]); return builder.endVector(); }
+  public static void startPayloadVector(FlatBufferBuilder builder, int 
numElems) { builder.startVector(1, numElems, 1); }
+  public static int endMessage(FlatBufferBuilder builder) {
+    int o = builder.endObject();
+    return o;
+  }
+  public static void finishMessageBuffer(FlatBufferBuilder builder, int 
offset) { builder.finish(offset); }
+  public static void finishSizePrefixedMessageBuffer(FlatBufferBuilder 
builder, int offset) { builder.finishSizePrefixed(offset); }
+}
+
diff --git 
a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java 
b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
index e3f1160d32..b9f5c8a6b1 100644
--- 
a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
+++ 
b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
@@ -20,15 +20,20 @@
 
 import static java.util.Base64.getDecoder;
 
+import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.Optional;
 
+import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.common.api.EncryptionContext;
 import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
 import org.apache.pulsar.common.api.proto.PulsarApi.CompressionType;
 import org.apache.pulsar.io.core.RecordContext;
+import org.apache.pulsar.io.kinesis.fbs.KeyValue;
+import org.apache.pulsar.io.kinesis.fbs.Message;
 import org.testng.Assert;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 import org.testng.collections.Maps;
 
@@ -43,71 +48,166 @@
  */
 public class UtilsTest {
 
+    @DataProvider(name = "encryption")
+    public Object[][] encryptionProvider() {
+        return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
+    }
+    
     @Test
     public void testJsonSerialization() throws Exception {
 
-        final String key1 = "key1";
-        final String key2 = "key2";
+        final String[] keyNames = { "key1", "key2" };
         final String key1Value = "test1";
         final String key2Value = "test2";
+        final byte[][] keyValues = { key1Value.getBytes(), 
key2Value.getBytes() };
         final String param = "param";
         final String algo = "algo";
+        int batchSize = 10;
+        int compressionMsgSize = 10;
 
-        // prepare encryption-ctx
-        EncryptionContext ctx = new EncryptionContext();
-        ctx.setAlgorithm(algo);
-        ctx.setBatchSize(Optional.of(10));
-        ctx.setCompressionType(CompressionType.LZ4);
-        ctx.setUncompressedMessageSize(10);
-        Map<String, EncryptionKey> keys = Maps.newHashMap();
-        EncryptionKey encKeyVal = new EncryptionKey();
-        encKeyVal.setKeyValue(key1Value.getBytes());
+        // serialize to json
+        byte[] data = "payload".getBytes();
+        Map<String, String> properties = Maps.newHashMap();
+        properties.put("prop1", "value");
         Map<String, String> metadata1 = Maps.newHashMap();
         metadata1.put("version", "v1");
         metadata1.put("ckms", "cmks-1");
-        encKeyVal.setMetadata(metadata1);
-        EncryptionKey encKeyVal2 = new EncryptionKey();
-        encKeyVal2.setKeyValue(key2Value.getBytes());
         Map<String, String> metadata2 = Maps.newHashMap();
         metadata2.put("version", "v2");
         metadata2.put("ckms", "cmks-2");
-        encKeyVal2.setMetadata(metadata2);
-        keys.put(key1, encKeyVal);
-        keys.put(key2, encKeyVal2);
-        ctx.setKeys(keys);
-        ctx.setMetadata(metadata1);
-        ctx.setParam(param.getBytes());
-
-        // serialize to json
-        byte[] data = "payload".getBytes();
-        Map<String, String> properties = Maps.newHashMap();
-        properties.put("prop1", "value");
-        RecordContext recordCtx = new RecordContextImpl(properties, ctx);
+        RecordContext recordCtx = createRecordContext(algo, keyNames, 
keyValues, param.getBytes(), metadata1, metadata2,
+                batchSize, compressionMsgSize, properties, true);
         String json = Utils.serializeRecordToJson(recordCtx, data);
-        System.out.println(json);
 
         // deserialize from json and assert
         KinesisMessageResponse kinesisJsonResponse = 
deSerializeRecordFromJson(json);
         Assert.assertEquals(data, 
getDecoder().decode(kinesisJsonResponse.getPayloadBase64()));
         EncryptionCtx encryptionCtxDeser = 
kinesisJsonResponse.getEncryptionCtx();
-        Assert.assertEquals(key1Value.getBytes(), 
getDecoder().decode(encryptionCtxDeser.getKeysMapBase64().get(key1)));
-        Assert.assertEquals(key2Value.getBytes(), 
getDecoder().decode(encryptionCtxDeser.getKeysMapBase64().get(key2)));
+        Assert.assertEquals(key1Value.getBytes(),
+                
getDecoder().decode(encryptionCtxDeser.getKeysMapBase64().get(keyNames[0])));
+        Assert.assertEquals(key2Value.getBytes(),
+                
getDecoder().decode(encryptionCtxDeser.getKeysMapBase64().get(keyNames[1])));
         Assert.assertEquals(param.getBytes(), 
getDecoder().decode(encryptionCtxDeser.getEncParamBase64()));
         Assert.assertEquals(algo, encryptionCtxDeser.getAlgorithm());
-        Assert.assertEquals(metadata1, 
encryptionCtxDeser.getKeysMetadataMap().get(key1));
-        Assert.assertEquals(metadata2, 
encryptionCtxDeser.getKeysMetadataMap().get(key2));
-        Assert.assertEquals(metadata1, encryptionCtxDeser.getMetadata());
+        Assert.assertEquals(metadata1, 
encryptionCtxDeser.getKeysMetadataMap().get(keyNames[0]));
+        Assert.assertEquals(metadata2, 
encryptionCtxDeser.getKeysMetadataMap().get(keyNames[1]));
         Assert.assertEquals(properties, kinesisJsonResponse.getProperties());
 
     }
 
+    @Test(dataProvider="encryption")
+    public void testFbSerialization(boolean isEncryption) throws Exception {
+
+        final String[] keyNames = { "key1", "key2" };
+        final String param = "param";
+        final String algo = "algo";
+        int batchSize = 10;
+        int compressionMsgSize = 10;
+
+        for (int k = 0; k < 5; k++) {
+            String payloadString = RandomStringUtils.random(142342 * k, 
String.valueOf(System.currentTimeMillis()));
+            final String key1Value = payloadString + "test1";
+            final String key2Value = payloadString + "test2";
+            final byte[][] keyValues = { key1Value.getBytes(), 
key2Value.getBytes() };
+            byte[] data = payloadString.getBytes();
+            Map<String, String> properties = Maps.newHashMap();
+            properties.put("prop1", payloadString);
+            Map<String, String> metadata1 = Maps.newHashMap();
+            metadata1.put("version", "v1");
+            metadata1.put("ckms", "cmks-1");
+            Map<String, String> metadata2 = Maps.newHashMap();
+            metadata2.put("version", "v2");
+            metadata2.put("ckms", "cmks-2");
+            RecordContext recordCtx = createRecordContext(algo, keyNames, 
keyValues, param.getBytes(), metadata1,
+                    metadata2, batchSize, compressionMsgSize, properties, 
isEncryption);
+            ByteBuffer flatBuffer = 
Utils.serializeRecordToFlatBuffer(recordCtx, data);
+
+            Message kinesisJsonResponse = Message.getRootAsMessage(flatBuffer);
+            byte[] fbPayloadBytes = new 
byte[kinesisJsonResponse.payloadLength()];
+            kinesisJsonResponse.payloadAsByteBuffer().get(fbPayloadBytes);
+            Assert.assertEquals(data, fbPayloadBytes);
+
+            if(isEncryption) {
+                org.apache.pulsar.io.kinesis.fbs.EncryptionCtx 
encryptionCtxDeser = kinesisJsonResponse.encryptionCtx();
+                byte compressionType = encryptionCtxDeser.compressionType();
+                int fbBatchSize = encryptionCtxDeser.batchSize();
+                boolean isBathcMessage = encryptionCtxDeser.isBatchMessage();
+                int fbCompressionMsgSize = 
encryptionCtxDeser.uncompressedMessageSize();
+                int totalKeys = encryptionCtxDeser.keysLength();
+                Map<String, Map<String, String>> fbKeyMetadataResult = 
Maps.newHashMap();
+                Map<String, byte[]> fbKeyValueResult = Maps.newHashMap();
+                for (int i = 0; i < encryptionCtxDeser.keysLength(); i++) {
+                    org.apache.pulsar.io.kinesis.fbs.EncryptionKey 
encryptionKey = encryptionCtxDeser.keys(i);
+                    String keyName = encryptionKey.key();
+                    byte[] keyValueBytes = new 
byte[encryptionKey.valueLength()];
+                    encryptionKey.valueAsByteBuffer().get(keyValueBytes);
+                    fbKeyValueResult.put(keyName, keyValueBytes);
+                    Map<String, String> fbMetadata = Maps.newHashMap();
+                    for (int j = 0; j < encryptionKey.metadataLength(); j++) {
+                        KeyValue encMtdata = encryptionKey.metadata(j);
+                        fbMetadata.put(encMtdata.key(), encMtdata.value());
+                    }
+                    fbKeyMetadataResult.put(keyName, fbMetadata);
+                }
+                byte[] paramBytes = new byte[encryptionCtxDeser.paramLength()];
+                encryptionCtxDeser.paramAsByteBuffer().get(paramBytes);
+
+                Assert.assertEquals(totalKeys, 2);
+                Assert.assertEquals(batchSize, fbBatchSize);
+                Assert.assertEquals(isBathcMessage, true);
+                Assert.assertEquals(compressionMsgSize, fbCompressionMsgSize);
+                Assert.assertEquals(keyValues[0], 
fbKeyValueResult.get(keyNames[0]));
+                Assert.assertEquals(keyValues[1], 
fbKeyValueResult.get(keyNames[1]));
+                Assert.assertEquals(metadata1, 
fbKeyMetadataResult.get(keyNames[0]));
+                Assert.assertEquals(metadata2, 
fbKeyMetadataResult.get(keyNames[1]));
+                Assert.assertEquals(compressionType, 
org.apache.pulsar.io.kinesis.fbs.CompressionType.LZ4);
+                Assert.assertEquals(param.getBytes(), paramBytes);
+                Assert.assertEquals(algo, encryptionCtxDeser.algo());
+            }
+            
+            Map<String, String> fbproperties = Maps.newHashMap();
+            for (int i = 0; i < kinesisJsonResponse.propertiesLength(); i++) {
+                KeyValue property = kinesisJsonResponse.properties(i);
+                fbproperties.put(property.key(), property.value());
+            }
+            Assert.assertEquals(properties, fbproperties);
+
+        }
+    }
+
+    private RecordContext createRecordContext(String algo, String[] keyNames, 
byte[][] keyValues, byte[] param,
+            Map<String, String> metadata1, Map<String, String> metadata2, int 
batchSize, int compressionMsgSize,
+            Map<String, String> properties, boolean isEncryption) {
+        EncryptionContext ctx = null;
+        if(isEncryption) {
+            ctx = new EncryptionContext();
+            ctx.setAlgorithm(algo);
+            ctx.setBatchSize(Optional.of(batchSize));
+            ctx.setCompressionType(CompressionType.LZ4);
+            ctx.setUncompressedMessageSize(compressionMsgSize);
+            Map<String, EncryptionKey> keys = Maps.newHashMap();
+            EncryptionKey encKeyVal = new EncryptionKey();
+            encKeyVal.setKeyValue(keyValues[0]);
+
+            encKeyVal.setMetadata(metadata1);
+            EncryptionKey encKeyVal2 = new EncryptionKey();
+            encKeyVal2.setKeyValue(keyValues[1]);
+            encKeyVal2.setMetadata(metadata2);
+            keys.put(keyNames[0], encKeyVal);
+            keys.put(keyNames[1], encKeyVal2);
+            ctx.setKeys(keys);
+            ctx.setParam(param);
+        }
+        return new RecordContextImpl(properties, Optional.ofNullable(ctx)); 
+    }
+
     class RecordContextImpl implements RecordContext {
         Map<String, String> properties;
         Optional<EncryptionContext> ectx;
 
-        public RecordContextImpl(Map<String, String> properties, 
EncryptionContext ectx) {
+        public RecordContextImpl(Map<String, String> properties, 
Optional<EncryptionContext> ectx) {
             this.properties = properties;
-            this.ectx = Optional.of(ectx);
+            this.ectx = ectx;
         }
 
         public Map<String, String> getProperties() {
@@ -146,8 +246,6 @@ public static KinesisMessageResponse 
deSerializeRecordFromJson(String jsonRecord
         private Map<String, String> keysMapBase64;
         // map of encryption-key metadata
         private Map<String, Map<String, String>> keysMetadataMap;
-        // encryption-ctx metadata
-        private Map<String, String> metadata;
         // encryption param which is base64 encoded
         private String encParamBase64;
         // encryption algorithm


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to