rdhabalia closed pull request #2108: add flatbuffer option to serialize kinesis-message in KinesisSink URL: https://github.com/apache/incubator-pulsar/pull/2108
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt index 763f62c7a5..33b97481da 100644 --- a/distribution/server/src/assemble/LICENSE.bin.txt +++ b/distribution/server/src/assemble/LICENSE.bin.txt @@ -459,6 +459,8 @@ The Apache Software License, Version 2.0 - org.inferred-freebuilder-1.14.9.jar * Snappy Java - org.xerial.snappy-snappy-java-1.1.1.3.jar + * Flatbuffers Java + - com.google.flatbuffers-flatbuffers-java-1.9.0.jar BSD 3-clause "New" or "Revised" License diff --git a/pom.xml b/pom.xml index 862a29d020..690022d77b 100644 --- a/pom.xml +++ b/pom.xml @@ -952,6 +952,11 @@ flexible messaging model and an intuitive client API.</description> <exclude>src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java</exclude> <exclude>src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java</exclude> <exclude>src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java</exclude> + <exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/CompressionType.java</exclude> + <exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionCtx.java</exclude> + <exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionKey.java</exclude> + <exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/KeyValue.java</exclude> + <exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/Message.java</exclude> <exclude>**/ByteBufCodedInputStream.java</exclude> <exclude>**/ByteBufCodedOutputStream.java</exclude> <exclude>bin/proto/*</exclude> @@ -1056,6 +1061,13 @@ flexible messaging model and an intuitive client API.</description> <exclude>src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java</exclude> <exclude>src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java</exclude> <exclude>bin/proto/MLDataFormats_pb2.py</exclude> + + <!-- pulasr-io-connector kinesis : auto generated files from flatbuffer schema --> + <exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/CompressionType.java</exclude> + <exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionCtx.java</exclude> + <exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionKey.java</exclude> + <exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/KeyValue.java</exclude> + <exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/Message.java</exclude> <!-- These files are BSD licensed files --> <exclude>src/main/java/org/apache/pulsar/common/util/protobuf/ByteBufCodedInputStream.java</exclude> diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/EncryptionContext.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/EncryptionContext.java index 98eaad7835..ff359c5b0f 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/EncryptionContext.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/EncryptionContext.java @@ -34,7 +34,6 @@ private Map<String, EncryptionKey> keys; private byte[] param; - private Map<String, String> metadata; private String algorithm; private CompressionType compressionType; private int uncompressedMessageSize; diff --git a/pulsar-io/kinesis/pom.xml b/pulsar-io/kinesis/pom.xml index 08c30049ab..f0c2776fab 100644 --- a/pulsar-io/kinesis/pom.xml +++ b/pulsar-io/kinesis/pom.xml @@ -81,6 +81,12 @@ <version>0.12.8</version> </dependency> <!-- /kinesis dependencies --> + + <dependency> + <groupId>com.google.flatbuffers</groupId> + <artifactId>flatbuffers-java</artifactId> + <version>1.9.0</version> + </dependency> </dependencies> diff --git a/pulsar-io/kinesis/src/main/fb/KinesisMessageApi.fbs b/pulsar-io/kinesis/src/main/fb/KinesisMessageApi.fbs new file mode 100644 index 0000000000..f7cc0304ae --- /dev/null +++ b/pulsar-io/kinesis/src/main/fb/KinesisMessageApi.fbs @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + /** + * Instrunction to generate fb-source + * export PULSAR_HOME=<Path where you cloned the pulsar repo> + * export KINESIS_IO_MAIN=${PULSAR_HOME}/pulsar-io/kinesis/src/main + * Command to build java src: flatc --java -o ${KINESIS_IO_MAIN}/java ${KINESIS_IO_MAIN}/fb/KinesisMessageApi.fbs + * flatc version 1.9.0 (pip install flatbuffers) + */ + +namespace org.apache.pulsar.io.kinesis.fbs; + +table KeyValue { + key:string; + value:string; +} + +table EncryptionKey { + key:string; + value:[byte]; + metadata:[KeyValue]; +} + +enum CompressionType : byte { NONE, LZ4, ZLIB } + +table EncryptionCtx { + keys:[EncryptionKey]; + param:[byte]; + algo:string; + compressionType:CompressionType; + uncompressedMessageSize:int; + batchSize:int; + isBatchMessage:bool=false; +} + +table Message { + encryptionCtx:EncryptionCtx; + properties:[KeyValue]; + payload:[byte]; +} + +root_type Message; \ No newline at end of file diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java index eaa2b91845..f6729abdf8 100644 --- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java @@ -102,7 +102,7 @@ public void write(RecordContext inputRecordContext, byte[] value) throws Excepti : partitionedKey; // partitionedKey Length must be at least one, and at most 256 ListenableFuture<UserRecordResult> addRecordResult = kinesisProducer.addUserRecord(this.streamName, partitionedKey, - ByteBuffer.wrap(createKinesisMessage(kinesisSinkConfig.getMessageFormat(), inputRecordContext, value))); + createKinesisMessage(kinesisSinkConfig.getMessageFormat(), inputRecordContext, value)); addCallback(addRecordResult, ProducerSendCallback.create(this.streamName, inputRecordContext, System.nanoTime()), directExecutor()); if (LOG.isDebugEnabled()) { @@ -273,12 +273,14 @@ public void refresh() { }; } - public static byte[] createKinesisMessage(MessageFormat msgFormat, RecordContext recordCtx, byte[] data) { + public static ByteBuffer createKinesisMessage(MessageFormat msgFormat, RecordContext recordCtx, byte[] data) { if (MessageFormat.FULL_MESSAGE_IN_JSON.equals(msgFormat)) { - return Utils.serializeRecordToJson(recordCtx, data).getBytes(); + return ByteBuffer.wrap(Utils.serializeRecordToJson(recordCtx, data).getBytes()); + } else if (MessageFormat.FULL_MESSAGE_IN_FB.equals(msgFormat)) { + return Utils.serializeRecordToFlatBuffer(recordCtx, data); } else { // send raw-message - return data; + return ByteBuffer.wrap(data); } } diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java index e3c8fde263..bf5f2ea883 100644 --- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java @@ -76,6 +76,10 @@ public static KinesisSinkConfig load(Map<String, Object> map) throws IOException * * */ - FULL_MESSAGE_IN_JSON; + FULL_MESSAGE_IN_JSON, + /** + * Kinesis sink sends message serialized in flat-buffer. + */ + FULL_MESSAGE_IN_FB; } } \ No newline at end of file diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java index 82360801db..469151eeb9 100644 --- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java @@ -19,13 +19,22 @@ package org.apache.pulsar.io.kinesis; +import static com.google.common.base.Preconditions.checkNotNull; import static java.util.Base64.getEncoder; +import java.nio.ByteBuffer; import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; import org.apache.pulsar.common.api.EncryptionContext; import org.apache.pulsar.io.core.RecordContext; +import org.apache.pulsar.io.kinesis.fbs.EncryptionCtx; +import org.apache.pulsar.io.kinesis.fbs.EncryptionKey; +import org.apache.pulsar.io.kinesis.fbs.KeyValue; +import org.apache.pulsar.io.kinesis.fbs.Message; +import com.google.flatbuffers.FlatBufferBuilder; import com.google.gson.JsonObject; public class Utils { @@ -34,13 +43,121 @@ private static final String PROPERTIES_FIELD = "properties"; private static final String KEY_MAP_FIELD = "keysMapBase64"; private static final String KEY_METADATA_MAP_FIELD = "keysMetadataMap"; - private static final String METADATA_FIELD = "metadata"; private static final String ENCRYPTION_PARAM_FIELD = "encParamBase64"; private static final String ALGO_FIELD = "algorithm"; private static final String COMPRESSION_TYPE_FIELD = "compressionType"; private static final String UNCPRESSED_MSG_SIZE_FIELD = "uncompressedMessageSize"; private static final String BATCH_SIZE_FIELD = "batchSize"; private static final String ENCRYPTION_CTX_FIELD = "encryptionCtx"; + + private static final FlatBufferBuilder DEFAULT_FB_BUILDER = new FlatBufferBuilder(0); + + /** + * Serialize record to flat-buffer. it's not a thread-safe method. + * + * @param inputRecordContext + * @param data + * @return + */ + public static ByteBuffer serializeRecordToFlatBuffer(RecordContext inputRecordContext, byte[] data) { + DEFAULT_FB_BUILDER.clear(); + return serializeRecordToFlatBuffer(DEFAULT_FB_BUILDER, inputRecordContext, data); + } + + public static ByteBuffer serializeRecordToFlatBuffer(FlatBufferBuilder builder, RecordContext inputRecordContext, byte[] data) { + checkNotNull(inputRecordContext, "record-context can't be null"); + Optional<EncryptionContext> encryptionCtx = inputRecordContext.getEncryptionCtx(); + Map<String, String> properties = inputRecordContext.getProperties(); + + int encryptionCtxOffset = -1; + int propertiesOffset = -1; + + if (properties != null && !properties.isEmpty()) { + int[] propertiesOffsetArray = new int[properties.size()]; + int i = 0; + for (Entry<String, String> property : properties.entrySet()) { + propertiesOffsetArray[i++] = KeyValue.createKeyValue(builder, builder.createString(property.getKey()), + builder.createString(property.getValue())); + } + propertiesOffset = Message.createPropertiesVector(builder, propertiesOffsetArray); + } + + if (encryptionCtx.isPresent()) { + encryptionCtxOffset = createEncryptionCtxOffset(builder, encryptionCtx); + } + + int payloadOffset = Message.createPayloadVector(builder, data); + Message.startMessage(builder); + Message.addPayload(builder, payloadOffset); + if (encryptionCtxOffset != -1) { + Message.addEncryptionCtx(builder, encryptionCtxOffset); + } + if (propertiesOffset != -1) { + Message.addProperties(builder, propertiesOffset); + } + int endMessage = Message.endMessage(builder); + builder.finish(endMessage); + ByteBuffer bb = builder.dataBuffer(); + + // to avoid copying of data, use same byte[] wrapped by ByteBuffer. But, ByteBuffer.array() returns entire array + // so, it requires to read from offset: + // builder.sizedByteArray()=>copies buffer: sizedByteArray(space, bb.capacity() - space) + int space = bb.capacity() - builder.offset(); + return ByteBuffer.wrap(bb.array(), space, bb.capacity() - space); + } + + private static int createEncryptionCtxOffset(final FlatBufferBuilder builder, Optional<EncryptionContext> encryptionCtx) { + if (!encryptionCtx.isPresent()) { + return -1; + } + // Message.addEncryptionCtx(builder, encryptionCtxOffset); + EncryptionContext ctx = encryptionCtx.get(); + int[] keysOffsets = new int[ctx.getKeys().size()]; + int keyIndex = 0; + for (Entry<String, org.apache.pulsar.common.api.EncryptionContext.EncryptionKey> entry : ctx.getKeys() + .entrySet()) { + int key = builder.createString(entry.getKey()); + int value = EncryptionKey.createValueVector(builder, entry.getValue().getKeyValue()); + Map<String, String> metadata = entry.getValue().getMetadata(); + int[] metadataOffsets = new int[metadata.size()]; + int i = 0; + for (Entry<String, String> m : metadata.entrySet()) { + metadataOffsets[i++] = KeyValue.createKeyValue(builder, builder.createString(m.getKey()), + builder.createString(m.getValue())); + } + int metadataOffset = -1; + if (metadata.size() > 0) { + metadataOffset = EncryptionKey.createMetadataVector(builder, metadataOffsets); + } + EncryptionKey.startEncryptionKey(builder); + EncryptionKey.addKey(builder, key); + EncryptionKey.addValue(builder, value); + if(metadataOffset!=-1) { + EncryptionKey.addMetadata(builder, metadataOffset); + } + keysOffsets[keyIndex++] = EncryptionKey.endEncryptionKey(builder); + } + + int keysOffset = EncryptionCtx.createKeysVector(builder, keysOffsets); + int param = EncryptionCtx.createParamVector(builder, ctx.getParam()); + int algo = builder.createString(ctx.getAlgorithm()); + int batchSize = ctx.getBatchSize().isPresent() ? ctx.getBatchSize().get() : 1; + byte compressionType; + switch (ctx.getCompressionType()) { + case LZ4: + compressionType = org.apache.pulsar.io.kinesis.fbs.CompressionType.LZ4; + break; + case ZLIB: + compressionType = org.apache.pulsar.io.kinesis.fbs.CompressionType.ZLIB; + break; + default: + compressionType = org.apache.pulsar.io.kinesis.fbs.CompressionType.NONE; + + } + return EncryptionCtx.createEncryptionCtx(builder, keysOffset, param, algo, compressionType, + ctx.getUncompressedMessageSize(), batchSize, ctx.getBatchSize().isPresent()); + + } /** * Serializes sink-record into json format. It encodes encryption-keys, encryption-param and payload in base64 @@ -51,9 +168,7 @@ * @return */ public static String serializeRecordToJson(RecordContext inputRecordContext, byte[] data) { - if (inputRecordContext == null) { - return null; - } + checkNotNull(inputRecordContext, "record-context can't be null"); JsonObject result = new JsonObject(); result.addProperty(PAYLOAD_FIELD, getEncoder().encodeToString(data)); if (inputRecordContext.getProperties() != null) { @@ -79,12 +194,6 @@ public static String serializeRecordToJson(RecordContext inputRecordContext, byt }); encryptionCtxJson.add(KEY_MAP_FIELD, keyBase64Map); encryptionCtxJson.add(KEY_METADATA_MAP_FIELD, keyMetadataMap); - Map<String, String> metadataMap = encryptionCtx.getMetadata(); - if (metadataMap != null && !metadataMap.isEmpty()) { - JsonObject metadata = new JsonObject(); - encryptionCtx.getMetadata().entrySet().forEach(m -> metadata.addProperty(m.getKey(), m.getValue())); - encryptionCtxJson.add(METADATA_FIELD, metadata); - } encryptionCtxJson.addProperty(ENCRYPTION_PARAM_FIELD, getEncoder().encodeToString(encryptionCtx.getParam())); encryptionCtxJson.addProperty(ALGO_FIELD, encryptionCtx.getAlgorithm()); diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/CompressionType.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/CompressionType.java new file mode 100644 index 0000000000..0a90a224d8 --- /dev/null +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/CompressionType.java @@ -0,0 +1,15 @@ +// automatically generated by the FlatBuffers compiler, do not modify + +package org.apache.pulsar.io.kinesis.fbs; + +public final class CompressionType { + private CompressionType() { } + public static final byte NONE = 0; + public static final byte LZ4 = 1; + public static final byte ZLIB = 2; + + public static final String[] names = { "NONE", "LZ4", "ZLIB", }; + + public static String name(int e) { return names[e]; } +} + diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionCtx.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionCtx.java new file mode 100644 index 0000000000..e6dff7282e --- /dev/null +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionCtx.java @@ -0,0 +1,68 @@ +// automatically generated by the FlatBuffers compiler, do not modify + +package org.apache.pulsar.io.kinesis.fbs; + +import java.nio.*; +import java.lang.*; +import java.util.*; +import com.google.flatbuffers.*; + +@SuppressWarnings("unused") +public final class EncryptionCtx extends Table { + public static EncryptionCtx getRootAsEncryptionCtx(ByteBuffer _bb) { return getRootAsEncryptionCtx(_bb, new EncryptionCtx()); } + public static EncryptionCtx getRootAsEncryptionCtx(ByteBuffer _bb, EncryptionCtx obj) { _bb.order(ByteOrder.LITTLE_ENDIAN); return (obj.__assign(_bb.getInt(_bb.position()) + _bb.position(), _bb)); } + public void __init(int _i, ByteBuffer _bb) { bb_pos = _i; bb = _bb; } + public EncryptionCtx __assign(int _i, ByteBuffer _bb) { __init(_i, _bb); return this; } + + public EncryptionKey keys(int j) { return keys(new EncryptionKey(), j); } + public EncryptionKey keys(EncryptionKey obj, int j) { int o = __offset(4); return o != 0 ? obj.__assign(__indirect(__vector(o) + j * 4), bb) : null; } + public int keysLength() { int o = __offset(4); return o != 0 ? __vector_len(o) : 0; } + public byte param(int j) { int o = __offset(6); return o != 0 ? bb.get(__vector(o) + j * 1) : 0; } + public int paramLength() { int o = __offset(6); return o != 0 ? __vector_len(o) : 0; } + public ByteBuffer paramAsByteBuffer() { return __vector_as_bytebuffer(6, 1); } + public ByteBuffer paramInByteBuffer(ByteBuffer _bb) { return __vector_in_bytebuffer(_bb, 6, 1); } + public String algo() { int o = __offset(8); return o != 0 ? __string(o + bb_pos) : null; } + public ByteBuffer algoAsByteBuffer() { return __vector_as_bytebuffer(8, 1); } + public ByteBuffer algoInByteBuffer(ByteBuffer _bb) { return __vector_in_bytebuffer(_bb, 8, 1); } + public byte compressionType() { int o = __offset(10); return o != 0 ? bb.get(o + bb_pos) : 0; } + public int uncompressedMessageSize() { int o = __offset(12); return o != 0 ? bb.getInt(o + bb_pos) : 0; } + public int batchSize() { int o = __offset(14); return o != 0 ? bb.getInt(o + bb_pos) : 0; } + public boolean isBatchMessage() { int o = __offset(16); return o != 0 ? 0!=bb.get(o + bb_pos) : false; } + + public static int createEncryptionCtx(FlatBufferBuilder builder, + int keysOffset, + int paramOffset, + int algoOffset, + byte compressionType, + int uncompressedMessageSize, + int batchSize, + boolean isBatchMessage) { + builder.startObject(7); + EncryptionCtx.addBatchSize(builder, batchSize); + EncryptionCtx.addUncompressedMessageSize(builder, uncompressedMessageSize); + EncryptionCtx.addAlgo(builder, algoOffset); + EncryptionCtx.addParam(builder, paramOffset); + EncryptionCtx.addKeys(builder, keysOffset); + EncryptionCtx.addIsBatchMessage(builder, isBatchMessage); + EncryptionCtx.addCompressionType(builder, compressionType); + return EncryptionCtx.endEncryptionCtx(builder); + } + + public static void startEncryptionCtx(FlatBufferBuilder builder) { builder.startObject(7); } + public static void addKeys(FlatBufferBuilder builder, int keysOffset) { builder.addOffset(0, keysOffset, 0); } + public static int createKeysVector(FlatBufferBuilder builder, int[] data) { builder.startVector(4, data.length, 4); for (int i = data.length - 1; i >= 0; i--) builder.addOffset(data[i]); return builder.endVector(); } + public static void startKeysVector(FlatBufferBuilder builder, int numElems) { builder.startVector(4, numElems, 4); } + public static void addParam(FlatBufferBuilder builder, int paramOffset) { builder.addOffset(1, paramOffset, 0); } + public static int createParamVector(FlatBufferBuilder builder, byte[] data) { builder.startVector(1, data.length, 1); for (int i = data.length - 1; i >= 0; i--) builder.addByte(data[i]); return builder.endVector(); } + public static void startParamVector(FlatBufferBuilder builder, int numElems) { builder.startVector(1, numElems, 1); } + public static void addAlgo(FlatBufferBuilder builder, int algoOffset) { builder.addOffset(2, algoOffset, 0); } + public static void addCompressionType(FlatBufferBuilder builder, byte compressionType) { builder.addByte(3, compressionType, 0); } + public static void addUncompressedMessageSize(FlatBufferBuilder builder, int uncompressedMessageSize) { builder.addInt(4, uncompressedMessageSize, 0); } + public static void addBatchSize(FlatBufferBuilder builder, int batchSize) { builder.addInt(5, batchSize, 0); } + public static void addIsBatchMessage(FlatBufferBuilder builder, boolean isBatchMessage) { builder.addBoolean(6, isBatchMessage, false); } + public static int endEncryptionCtx(FlatBufferBuilder builder) { + int o = builder.endObject(); + return o; + } +} + diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionKey.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionKey.java new file mode 100644 index 0000000000..44d74f49a5 --- /dev/null +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionKey.java @@ -0,0 +1,52 @@ +// automatically generated by the FlatBuffers compiler, do not modify + +package org.apache.pulsar.io.kinesis.fbs; + +import java.nio.*; +import java.lang.*; +import java.util.*; +import com.google.flatbuffers.*; + +@SuppressWarnings("unused") +public final class EncryptionKey extends Table { + public static EncryptionKey getRootAsEncryptionKey(ByteBuffer _bb) { return getRootAsEncryptionKey(_bb, new EncryptionKey()); } + public static EncryptionKey getRootAsEncryptionKey(ByteBuffer _bb, EncryptionKey obj) { _bb.order(ByteOrder.LITTLE_ENDIAN); return (obj.__assign(_bb.getInt(_bb.position()) + _bb.position(), _bb)); } + public void __init(int _i, ByteBuffer _bb) { bb_pos = _i; bb = _bb; } + public EncryptionKey __assign(int _i, ByteBuffer _bb) { __init(_i, _bb); return this; } + + public String key() { int o = __offset(4); return o != 0 ? __string(o + bb_pos) : null; } + public ByteBuffer keyAsByteBuffer() { return __vector_as_bytebuffer(4, 1); } + public ByteBuffer keyInByteBuffer(ByteBuffer _bb) { return __vector_in_bytebuffer(_bb, 4, 1); } + public byte value(int j) { int o = __offset(6); return o != 0 ? bb.get(__vector(o) + j * 1) : 0; } + public int valueLength() { int o = __offset(6); return o != 0 ? __vector_len(o) : 0; } + public ByteBuffer valueAsByteBuffer() { return __vector_as_bytebuffer(6, 1); } + public ByteBuffer valueInByteBuffer(ByteBuffer _bb) { return __vector_in_bytebuffer(_bb, 6, 1); } + public KeyValue metadata(int j) { return metadata(new KeyValue(), j); } + public KeyValue metadata(KeyValue obj, int j) { int o = __offset(8); return o != 0 ? obj.__assign(__indirect(__vector(o) + j * 4), bb) : null; } + public int metadataLength() { int o = __offset(8); return o != 0 ? __vector_len(o) : 0; } + + public static int createEncryptionKey(FlatBufferBuilder builder, + int keyOffset, + int valueOffset, + int metadataOffset) { + builder.startObject(3); + EncryptionKey.addMetadata(builder, metadataOffset); + EncryptionKey.addValue(builder, valueOffset); + EncryptionKey.addKey(builder, keyOffset); + return EncryptionKey.endEncryptionKey(builder); + } + + public static void startEncryptionKey(FlatBufferBuilder builder) { builder.startObject(3); } + public static void addKey(FlatBufferBuilder builder, int keyOffset) { builder.addOffset(0, keyOffset, 0); } + public static void addValue(FlatBufferBuilder builder, int valueOffset) { builder.addOffset(1, valueOffset, 0); } + public static int createValueVector(FlatBufferBuilder builder, byte[] data) { builder.startVector(1, data.length, 1); for (int i = data.length - 1; i >= 0; i--) builder.addByte(data[i]); return builder.endVector(); } + public static void startValueVector(FlatBufferBuilder builder, int numElems) { builder.startVector(1, numElems, 1); } + public static void addMetadata(FlatBufferBuilder builder, int metadataOffset) { builder.addOffset(2, metadataOffset, 0); } + public static int createMetadataVector(FlatBufferBuilder builder, int[] data) { builder.startVector(4, data.length, 4); for (int i = data.length - 1; i >= 0; i--) builder.addOffset(data[i]); return builder.endVector(); } + public static void startMetadataVector(FlatBufferBuilder builder, int numElems) { builder.startVector(4, numElems, 4); } + public static int endEncryptionKey(FlatBufferBuilder builder) { + int o = builder.endObject(); + return o; + } +} + diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/KeyValue.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/KeyValue.java new file mode 100644 index 0000000000..8cb53b4865 --- /dev/null +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/KeyValue.java @@ -0,0 +1,41 @@ +// automatically generated by the FlatBuffers compiler, do not modify + +package org.apache.pulsar.io.kinesis.fbs; + +import java.nio.*; +import java.lang.*; +import java.util.*; +import com.google.flatbuffers.*; + +@SuppressWarnings("unused") +public final class KeyValue extends Table { + public static KeyValue getRootAsKeyValue(ByteBuffer _bb) { return getRootAsKeyValue(_bb, new KeyValue()); } + public static KeyValue getRootAsKeyValue(ByteBuffer _bb, KeyValue obj) { _bb.order(ByteOrder.LITTLE_ENDIAN); return (obj.__assign(_bb.getInt(_bb.position()) + _bb.position(), _bb)); } + public void __init(int _i, ByteBuffer _bb) { bb_pos = _i; bb = _bb; } + public KeyValue __assign(int _i, ByteBuffer _bb) { __init(_i, _bb); return this; } + + public String key() { int o = __offset(4); return o != 0 ? __string(o + bb_pos) : null; } + public ByteBuffer keyAsByteBuffer() { return __vector_as_bytebuffer(4, 1); } + public ByteBuffer keyInByteBuffer(ByteBuffer _bb) { return __vector_in_bytebuffer(_bb, 4, 1); } + public String value() { int o = __offset(6); return o != 0 ? __string(o + bb_pos) : null; } + public ByteBuffer valueAsByteBuffer() { return __vector_as_bytebuffer(6, 1); } + public ByteBuffer valueInByteBuffer(ByteBuffer _bb) { return __vector_in_bytebuffer(_bb, 6, 1); } + + public static int createKeyValue(FlatBufferBuilder builder, + int keyOffset, + int valueOffset) { + builder.startObject(2); + KeyValue.addValue(builder, valueOffset); + KeyValue.addKey(builder, keyOffset); + return KeyValue.endKeyValue(builder); + } + + public static void startKeyValue(FlatBufferBuilder builder) { builder.startObject(2); } + public static void addKey(FlatBufferBuilder builder, int keyOffset) { builder.addOffset(0, keyOffset, 0); } + public static void addValue(FlatBufferBuilder builder, int valueOffset) { builder.addOffset(1, valueOffset, 0); } + public static int endKeyValue(FlatBufferBuilder builder) { + int o = builder.endObject(); + return o; + } +} + diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/Message.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/Message.java new file mode 100644 index 0000000000..d29171c4b0 --- /dev/null +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/fbs/Message.java @@ -0,0 +1,53 @@ +// automatically generated by the FlatBuffers compiler, do not modify + +package org.apache.pulsar.io.kinesis.fbs; + +import java.nio.*; +import java.lang.*; +import java.util.*; +import com.google.flatbuffers.*; + +@SuppressWarnings("unused") +public final class Message extends Table { + public static Message getRootAsMessage(ByteBuffer _bb) { return getRootAsMessage(_bb, new Message()); } + public static Message getRootAsMessage(ByteBuffer _bb, Message obj) { _bb.order(ByteOrder.LITTLE_ENDIAN); return (obj.__assign(_bb.getInt(_bb.position()) + _bb.position(), _bb)); } + public void __init(int _i, ByteBuffer _bb) { bb_pos = _i; bb = _bb; } + public Message __assign(int _i, ByteBuffer _bb) { __init(_i, _bb); return this; } + + public EncryptionCtx encryptionCtx() { return encryptionCtx(new EncryptionCtx()); } + public EncryptionCtx encryptionCtx(EncryptionCtx obj) { int o = __offset(4); return o != 0 ? obj.__assign(__indirect(o + bb_pos), bb) : null; } + public KeyValue properties(int j) { return properties(new KeyValue(), j); } + public KeyValue properties(KeyValue obj, int j) { int o = __offset(6); return o != 0 ? obj.__assign(__indirect(__vector(o) + j * 4), bb) : null; } + public int propertiesLength() { int o = __offset(6); return o != 0 ? __vector_len(o) : 0; } + public byte payload(int j) { int o = __offset(8); return o != 0 ? bb.get(__vector(o) + j * 1) : 0; } + public int payloadLength() { int o = __offset(8); return o != 0 ? __vector_len(o) : 0; } + public ByteBuffer payloadAsByteBuffer() { return __vector_as_bytebuffer(8, 1); } + public ByteBuffer payloadInByteBuffer(ByteBuffer _bb) { return __vector_in_bytebuffer(_bb, 8, 1); } + + public static int createMessage(FlatBufferBuilder builder, + int encryptionCtxOffset, + int propertiesOffset, + int payloadOffset) { + builder.startObject(3); + Message.addPayload(builder, payloadOffset); + Message.addProperties(builder, propertiesOffset); + Message.addEncryptionCtx(builder, encryptionCtxOffset); + return Message.endMessage(builder); + } + + public static void startMessage(FlatBufferBuilder builder) { builder.startObject(3); } + public static void addEncryptionCtx(FlatBufferBuilder builder, int encryptionCtxOffset) { builder.addOffset(0, encryptionCtxOffset, 0); } + public static void addProperties(FlatBufferBuilder builder, int propertiesOffset) { builder.addOffset(1, propertiesOffset, 0); } + public static int createPropertiesVector(FlatBufferBuilder builder, int[] data) { builder.startVector(4, data.length, 4); for (int i = data.length - 1; i >= 0; i--) builder.addOffset(data[i]); return builder.endVector(); } + public static void startPropertiesVector(FlatBufferBuilder builder, int numElems) { builder.startVector(4, numElems, 4); } + public static void addPayload(FlatBufferBuilder builder, int payloadOffset) { builder.addOffset(2, payloadOffset, 0); } + public static int createPayloadVector(FlatBufferBuilder builder, byte[] data) { builder.startVector(1, data.length, 1); for (int i = data.length - 1; i >= 0; i--) builder.addByte(data[i]); return builder.endVector(); } + public static void startPayloadVector(FlatBufferBuilder builder, int numElems) { builder.startVector(1, numElems, 1); } + public static int endMessage(FlatBufferBuilder builder) { + int o = builder.endObject(); + return o; + } + public static void finishMessageBuffer(FlatBufferBuilder builder, int offset) { builder.finish(offset); } + public static void finishSizePrefixedMessageBuffer(FlatBufferBuilder builder, int offset) { builder.finishSizePrefixed(offset); } +} + diff --git a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java index e3f1160d32..b9f5c8a6b1 100644 --- a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java +++ b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java @@ -20,15 +20,20 @@ import static java.util.Base64.getDecoder; +import java.nio.ByteBuffer; import java.util.Map; import java.util.Optional; +import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.common.api.EncryptionContext; import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey; import org.apache.pulsar.common.api.proto.PulsarApi.CompressionType; import org.apache.pulsar.io.core.RecordContext; +import org.apache.pulsar.io.kinesis.fbs.KeyValue; +import org.apache.pulsar.io.kinesis.fbs.Message; import org.testng.Assert; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import org.testng.collections.Maps; @@ -43,71 +48,166 @@ */ public class UtilsTest { + @DataProvider(name = "encryption") + public Object[][] encryptionProvider() { + return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } }; + } + @Test public void testJsonSerialization() throws Exception { - final String key1 = "key1"; - final String key2 = "key2"; + final String[] keyNames = { "key1", "key2" }; final String key1Value = "test1"; final String key2Value = "test2"; + final byte[][] keyValues = { key1Value.getBytes(), key2Value.getBytes() }; final String param = "param"; final String algo = "algo"; + int batchSize = 10; + int compressionMsgSize = 10; - // prepare encryption-ctx - EncryptionContext ctx = new EncryptionContext(); - ctx.setAlgorithm(algo); - ctx.setBatchSize(Optional.of(10)); - ctx.setCompressionType(CompressionType.LZ4); - ctx.setUncompressedMessageSize(10); - Map<String, EncryptionKey> keys = Maps.newHashMap(); - EncryptionKey encKeyVal = new EncryptionKey(); - encKeyVal.setKeyValue(key1Value.getBytes()); + // serialize to json + byte[] data = "payload".getBytes(); + Map<String, String> properties = Maps.newHashMap(); + properties.put("prop1", "value"); Map<String, String> metadata1 = Maps.newHashMap(); metadata1.put("version", "v1"); metadata1.put("ckms", "cmks-1"); - encKeyVal.setMetadata(metadata1); - EncryptionKey encKeyVal2 = new EncryptionKey(); - encKeyVal2.setKeyValue(key2Value.getBytes()); Map<String, String> metadata2 = Maps.newHashMap(); metadata2.put("version", "v2"); metadata2.put("ckms", "cmks-2"); - encKeyVal2.setMetadata(metadata2); - keys.put(key1, encKeyVal); - keys.put(key2, encKeyVal2); - ctx.setKeys(keys); - ctx.setMetadata(metadata1); - ctx.setParam(param.getBytes()); - - // serialize to json - byte[] data = "payload".getBytes(); - Map<String, String> properties = Maps.newHashMap(); - properties.put("prop1", "value"); - RecordContext recordCtx = new RecordContextImpl(properties, ctx); + RecordContext recordCtx = createRecordContext(algo, keyNames, keyValues, param.getBytes(), metadata1, metadata2, + batchSize, compressionMsgSize, properties, true); String json = Utils.serializeRecordToJson(recordCtx, data); - System.out.println(json); // deserialize from json and assert KinesisMessageResponse kinesisJsonResponse = deSerializeRecordFromJson(json); Assert.assertEquals(data, getDecoder().decode(kinesisJsonResponse.getPayloadBase64())); EncryptionCtx encryptionCtxDeser = kinesisJsonResponse.getEncryptionCtx(); - Assert.assertEquals(key1Value.getBytes(), getDecoder().decode(encryptionCtxDeser.getKeysMapBase64().get(key1))); - Assert.assertEquals(key2Value.getBytes(), getDecoder().decode(encryptionCtxDeser.getKeysMapBase64().get(key2))); + Assert.assertEquals(key1Value.getBytes(), + getDecoder().decode(encryptionCtxDeser.getKeysMapBase64().get(keyNames[0]))); + Assert.assertEquals(key2Value.getBytes(), + getDecoder().decode(encryptionCtxDeser.getKeysMapBase64().get(keyNames[1]))); Assert.assertEquals(param.getBytes(), getDecoder().decode(encryptionCtxDeser.getEncParamBase64())); Assert.assertEquals(algo, encryptionCtxDeser.getAlgorithm()); - Assert.assertEquals(metadata1, encryptionCtxDeser.getKeysMetadataMap().get(key1)); - Assert.assertEquals(metadata2, encryptionCtxDeser.getKeysMetadataMap().get(key2)); - Assert.assertEquals(metadata1, encryptionCtxDeser.getMetadata()); + Assert.assertEquals(metadata1, encryptionCtxDeser.getKeysMetadataMap().get(keyNames[0])); + Assert.assertEquals(metadata2, encryptionCtxDeser.getKeysMetadataMap().get(keyNames[1])); Assert.assertEquals(properties, kinesisJsonResponse.getProperties()); } + @Test(dataProvider="encryption") + public void testFbSerialization(boolean isEncryption) throws Exception { + + final String[] keyNames = { "key1", "key2" }; + final String param = "param"; + final String algo = "algo"; + int batchSize = 10; + int compressionMsgSize = 10; + + for (int k = 0; k < 5; k++) { + String payloadString = RandomStringUtils.random(142342 * k, String.valueOf(System.currentTimeMillis())); + final String key1Value = payloadString + "test1"; + final String key2Value = payloadString + "test2"; + final byte[][] keyValues = { key1Value.getBytes(), key2Value.getBytes() }; + byte[] data = payloadString.getBytes(); + Map<String, String> properties = Maps.newHashMap(); + properties.put("prop1", payloadString); + Map<String, String> metadata1 = Maps.newHashMap(); + metadata1.put("version", "v1"); + metadata1.put("ckms", "cmks-1"); + Map<String, String> metadata2 = Maps.newHashMap(); + metadata2.put("version", "v2"); + metadata2.put("ckms", "cmks-2"); + RecordContext recordCtx = createRecordContext(algo, keyNames, keyValues, param.getBytes(), metadata1, + metadata2, batchSize, compressionMsgSize, properties, isEncryption); + ByteBuffer flatBuffer = Utils.serializeRecordToFlatBuffer(recordCtx, data); + + Message kinesisJsonResponse = Message.getRootAsMessage(flatBuffer); + byte[] fbPayloadBytes = new byte[kinesisJsonResponse.payloadLength()]; + kinesisJsonResponse.payloadAsByteBuffer().get(fbPayloadBytes); + Assert.assertEquals(data, fbPayloadBytes); + + if(isEncryption) { + org.apache.pulsar.io.kinesis.fbs.EncryptionCtx encryptionCtxDeser = kinesisJsonResponse.encryptionCtx(); + byte compressionType = encryptionCtxDeser.compressionType(); + int fbBatchSize = encryptionCtxDeser.batchSize(); + boolean isBathcMessage = encryptionCtxDeser.isBatchMessage(); + int fbCompressionMsgSize = encryptionCtxDeser.uncompressedMessageSize(); + int totalKeys = encryptionCtxDeser.keysLength(); + Map<String, Map<String, String>> fbKeyMetadataResult = Maps.newHashMap(); + Map<String, byte[]> fbKeyValueResult = Maps.newHashMap(); + for (int i = 0; i < encryptionCtxDeser.keysLength(); i++) { + org.apache.pulsar.io.kinesis.fbs.EncryptionKey encryptionKey = encryptionCtxDeser.keys(i); + String keyName = encryptionKey.key(); + byte[] keyValueBytes = new byte[encryptionKey.valueLength()]; + encryptionKey.valueAsByteBuffer().get(keyValueBytes); + fbKeyValueResult.put(keyName, keyValueBytes); + Map<String, String> fbMetadata = Maps.newHashMap(); + for (int j = 0; j < encryptionKey.metadataLength(); j++) { + KeyValue encMtdata = encryptionKey.metadata(j); + fbMetadata.put(encMtdata.key(), encMtdata.value()); + } + fbKeyMetadataResult.put(keyName, fbMetadata); + } + byte[] paramBytes = new byte[encryptionCtxDeser.paramLength()]; + encryptionCtxDeser.paramAsByteBuffer().get(paramBytes); + + Assert.assertEquals(totalKeys, 2); + Assert.assertEquals(batchSize, fbBatchSize); + Assert.assertEquals(isBathcMessage, true); + Assert.assertEquals(compressionMsgSize, fbCompressionMsgSize); + Assert.assertEquals(keyValues[0], fbKeyValueResult.get(keyNames[0])); + Assert.assertEquals(keyValues[1], fbKeyValueResult.get(keyNames[1])); + Assert.assertEquals(metadata1, fbKeyMetadataResult.get(keyNames[0])); + Assert.assertEquals(metadata2, fbKeyMetadataResult.get(keyNames[1])); + Assert.assertEquals(compressionType, org.apache.pulsar.io.kinesis.fbs.CompressionType.LZ4); + Assert.assertEquals(param.getBytes(), paramBytes); + Assert.assertEquals(algo, encryptionCtxDeser.algo()); + } + + Map<String, String> fbproperties = Maps.newHashMap(); + for (int i = 0; i < kinesisJsonResponse.propertiesLength(); i++) { + KeyValue property = kinesisJsonResponse.properties(i); + fbproperties.put(property.key(), property.value()); + } + Assert.assertEquals(properties, fbproperties); + + } + } + + private RecordContext createRecordContext(String algo, String[] keyNames, byte[][] keyValues, byte[] param, + Map<String, String> metadata1, Map<String, String> metadata2, int batchSize, int compressionMsgSize, + Map<String, String> properties, boolean isEncryption) { + EncryptionContext ctx = null; + if(isEncryption) { + ctx = new EncryptionContext(); + ctx.setAlgorithm(algo); + ctx.setBatchSize(Optional.of(batchSize)); + ctx.setCompressionType(CompressionType.LZ4); + ctx.setUncompressedMessageSize(compressionMsgSize); + Map<String, EncryptionKey> keys = Maps.newHashMap(); + EncryptionKey encKeyVal = new EncryptionKey(); + encKeyVal.setKeyValue(keyValues[0]); + + encKeyVal.setMetadata(metadata1); + EncryptionKey encKeyVal2 = new EncryptionKey(); + encKeyVal2.setKeyValue(keyValues[1]); + encKeyVal2.setMetadata(metadata2); + keys.put(keyNames[0], encKeyVal); + keys.put(keyNames[1], encKeyVal2); + ctx.setKeys(keys); + ctx.setParam(param); + } + return new RecordContextImpl(properties, Optional.ofNullable(ctx)); + } + class RecordContextImpl implements RecordContext { Map<String, String> properties; Optional<EncryptionContext> ectx; - public RecordContextImpl(Map<String, String> properties, EncryptionContext ectx) { + public RecordContextImpl(Map<String, String> properties, Optional<EncryptionContext> ectx) { this.properties = properties; - this.ectx = Optional.of(ectx); + this.ectx = ectx; } public Map<String, String> getProperties() { @@ -146,8 +246,6 @@ public static KinesisMessageResponse deSerializeRecordFromJson(String jsonRecord private Map<String, String> keysMapBase64; // map of encryption-key metadata private Map<String, Map<String, String>> keysMetadataMap; - // encryption-ctx metadata - private Map<String, String> metadata; // encryption param which is base64 encoded private String encParamBase64; // encryption algorithm ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services