codelipenghui commented on code in PR #20958:
URL: https://github.com/apache/pulsar/pull/20958#discussion_r1300963969
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -1739,7 +1739,7 @@ private ByteBuf decryptPayloadIfNeeded(MessageIdData
messageId, int redeliveryCo
if (conf.getCryptoKeyReader() == null) {
switch (conf.getCryptoFailureAction()) {
case CONSUME:
- log.warn("[{}][{}][{}] CryptoKeyReader interface is not
implemented. Consuming encrypted message.",
+ log.info("[{}][{}][{}] CryptoKeyReader interface is not
implemented. Consuming encrypted message.",
Review Comment:
Should we change to debug to avoid the message level logs?
##########
pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ProducerMessage.java:
##########
@@ -69,4 +70,13 @@ public class ProducerMessage {
// Base64 encoded serialized schema for payload
public String valueSchema;
+
+ // Compression type. Do not set it if compression is not performed.
+ public String encryptionParam;
+
+ // The size of the payload before compression. Do not set it if
compression is not performed.
+ public CompressionType compressionType;
+
+ // Base64 encoded serialized initialization vector used when the client
encrypts.
Review Comment:
It's an integer, shouldn't be base64 encoded?
##########
pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ProducerMessage.java:
##########
@@ -69,4 +70,13 @@ public class ProducerMessage {
// Base64 encoded serialized schema for payload
public String valueSchema;
+
+ // Compression type. Do not set it if compression is not performed.
Review Comment:
It's not compression type
##########
pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ProducerMessage.java:
##########
@@ -69,4 +70,13 @@ public class ProducerMessage {
// Base64 encoded serialized schema for payload
public String valueSchema;
+
+ // Compression type. Do not set it if compression is not performed.
+ public String encryptionParam;
+
+ // The size of the payload before compression. Do not set it if
compression is not performed.
Review Comment:
Ah, this one is for `uncompressedMessageSize`
##########
pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java:
##########
@@ -315,30 +351,117 @@ protected ProducerBuilder<byte[]>
getProducerBuilder(PulsarClient client) {
builder.sendTimeout(Integer.parseInt(queryParams.get("sendTimeoutMillis")),
TimeUnit.MILLISECONDS);
}
- if (queryParams.containsKey("batchingEnabled")) {
-
builder.enableBatching(Boolean.parseBoolean(queryParams.get("batchingEnabled")));
+ if (queryParams.containsKey("messageRoutingMode")) {
+ checkArgument(
+ Enums.getIfPresent(MessageRoutingMode.class,
queryParams.get("messageRoutingMode")).isPresent(),
+ "Invalid messageRoutingMode %s",
queryParams.get("messageRoutingMode"));
+ MessageRoutingMode routingMode =
MessageRoutingMode.valueOf(queryParams.get("messageRoutingMode"));
+ if (!MessageRoutingMode.CustomPartition.equals(routingMode)) {
+ builder.messageRoutingMode(routingMode);
+ }
Review Comment:
Is it a part of the proposal?
##########
pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java:
##########
@@ -356,7 +479,27 @@ protected ProducerBuilder<byte[]>
getProducerBuilder(PulsarClient client) {
builder.addEncryptionKey(key);
}
}
- return builder;
+ }
+
+ private void printWarnLogIfSettingBatchedParams() {
+ if (clientSideEncrypt &&
queryParams.containsKey("batchingMaxMessages")) {
+ log.warn("Since clientSideEncrypt is true, the param
batchingEnabled of producer will be ignored");
+ }
+ if (queryParams.containsKey("batchingMaxMessages")) {
+ log.warn("Since batchingEnabled is false, the param
batchingMaxMessages of producer will be ignored");
+ }
+ if (queryParams.containsKey("maxPendingMessages")) {
+ log.warn("Since batchingEnabled is false, the param
maxPendingMessages of producer will be ignored");
+ }
+ if (queryParams.containsKey("batchingMaxPublishDelay")) {
+ log.warn("Since batchingEnabled is false, the param
batchingMaxPublishDelay of producer will be ignored");
+ }
+ }
Review Comment:
It should be removed, right? It's not a part of the proposal
##########
pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ConsumerMessage.java:
##########
@@ -35,4 +35,5 @@ public class ConsumerMessage {
public EncryptionContext encryptionContext;
public String key;
+ // TODO support ackSet in the future.
Review Comment:
We can remove it first. Not sure if it is necessary to support ackSet.
##########
pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java:
##########
@@ -315,30 +351,117 @@ protected ProducerBuilder<byte[]>
getProducerBuilder(PulsarClient client) {
builder.sendTimeout(Integer.parseInt(queryParams.get("sendTimeoutMillis")),
TimeUnit.MILLISECONDS);
}
- if (queryParams.containsKey("batchingEnabled")) {
-
builder.enableBatching(Boolean.parseBoolean(queryParams.get("batchingEnabled")));
+ if (queryParams.containsKey("messageRoutingMode")) {
+ checkArgument(
+ Enums.getIfPresent(MessageRoutingMode.class,
queryParams.get("messageRoutingMode")).isPresent(),
+ "Invalid messageRoutingMode %s",
queryParams.get("messageRoutingMode"));
+ MessageRoutingMode routingMode =
MessageRoutingMode.valueOf(queryParams.get("messageRoutingMode"));
+ if (!MessageRoutingMode.CustomPartition.equals(routingMode)) {
+ builder.messageRoutingMode(routingMode);
+ }
}
- if (queryParams.containsKey("batchingMaxMessages")) {
-
builder.batchingMaxMessages(Integer.parseInt(queryParams.get("batchingMaxMessages")));
+ Map<String, EncryptionKey> encryptionKeyMap =
tryToExtractJsonEncryptionKeys();
+ if (encryptionKeyMap != null) {
+ popularProducerBuilderForClientSideEncrypt(builder,
encryptionKeyMap);
+ } else {
+ popularProducerBuilderForServerSideEncrypt(builder);
}
+ return builder;
+ }
- if (queryParams.containsKey("maxPendingMessages")) {
-
builder.maxPendingMessages(Integer.parseInt(queryParams.get("maxPendingMessages")));
+ private Map<String, EncryptionKey> tryToExtractJsonEncryptionKeys() {
+ if (!queryParams.containsKey("encryptionKeys")) {
+ return null;
+ }
+ // Base64 decode.
+ byte[] param = null;
+ try {
+ param =
Base64.getDecoder().decode(StringUtils.trim(queryParams.get("encryptionKeys")));
+ } catch (Exception base64DecodeEx) {
+ log.error("Could not base64 decode the param encryptionKeys",
base64DecodeEx);
+ throw new IllegalArgumentException("Could not base64 decode the
param encryptionKeys");
+ }
+ try {
+ Map<String, EncryptionKey> keys =
ObjectMapperFactory.getMapper().getObjectMapper()
+ .readValue(param, new TypeReference<Map<String,
EncryptionKey>>() {
+ });
+ if (keys.isEmpty()) {
+ return null;
+ }
+ if (keys.values().iterator().next().getKeyValue() == null) {
+ return null;
+ }
+ return keys;
+ } catch (IOException ex) {
+ return null;
}
+ }
- if (queryParams.containsKey("batchingMaxPublishDelay")) {
-
builder.batchingMaxPublishDelay(Integer.parseInt(queryParams.get("batchingMaxPublishDelay")),
- TimeUnit.MILLISECONDS);
+ private void
popularProducerBuilderForClientSideEncrypt(ProducerBuilder<byte[]> builder,
+ Map<String,
EncryptionKey> encryptionKeyMap) {
+ this.clientSideEncrypt = true;
+ int keysLen = encryptionKeyMap.size();
+ final String[] keyNameArray = new String[keysLen];
+ final byte[][] keyValueArray = new byte[keysLen][];
+ final List<KeyValue>[] keyMetadataArray = new List[keysLen];
+ // Format keys.
+ int index = 0;
+ for (Map.Entry<String, EncryptionKey> entry :
encryptionKeyMap.entrySet()) {
+ checkArgument(StringUtils.isNotBlank(entry.getKey()), "Empty param
encryptionKeys.key");
+ checkArgument(entry.getValue() != null, "Empty param
encryptionKeys.value");
+ checkArgument(entry.getValue().getKeyValue() != null, "Empty param
encryptionKeys.value.keyValue");
+ keyNameArray[index] = StringUtils.trim(entry.getKey());
+ keyValueArray[index] = entry.getValue().getKeyValue();
+ if (entry.getValue().getMetadata() == null) {
+ keyMetadataArray[index] = Collections.emptyList();
+ } else {
+ keyMetadataArray[index] =
entry.getValue().getMetadata().entrySet().stream()
+ .map(e -> new
KeyValue().setKey(e.getKey()).setValue(e.getValue()))
+ .collect(Collectors.toList());
+ }
+ builder.addEncryptionKey(keyNameArray[index]);
}
+ // Disable server-side batch process, compression, and encryption, and
only set the message metadata that
+ // which client-side provided.
+ builder.enableBatching(false);
+ builder.compressionType(CompressionType.NONE);
+ builder.cryptoKeyReader(DummyCryptoKeyReaderImpl.INSTANCE);
Review Comment:
Please add a comment for these two lines to explain why we are using NONE
compressor and DummyCryptoKeyReader.
##########
pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java:
##########
@@ -315,30 +351,117 @@ protected ProducerBuilder<byte[]>
getProducerBuilder(PulsarClient client) {
builder.sendTimeout(Integer.parseInt(queryParams.get("sendTimeoutMillis")),
TimeUnit.MILLISECONDS);
}
- if (queryParams.containsKey("batchingEnabled")) {
-
builder.enableBatching(Boolean.parseBoolean(queryParams.get("batchingEnabled")));
+ if (queryParams.containsKey("messageRoutingMode")) {
+ checkArgument(
+ Enums.getIfPresent(MessageRoutingMode.class,
queryParams.get("messageRoutingMode")).isPresent(),
+ "Invalid messageRoutingMode %s",
queryParams.get("messageRoutingMode"));
+ MessageRoutingMode routingMode =
MessageRoutingMode.valueOf(queryParams.get("messageRoutingMode"));
+ if (!MessageRoutingMode.CustomPartition.equals(routingMode)) {
+ builder.messageRoutingMode(routingMode);
+ }
}
- if (queryParams.containsKey("batchingMaxMessages")) {
-
builder.batchingMaxMessages(Integer.parseInt(queryParams.get("batchingMaxMessages")));
+ Map<String, EncryptionKey> encryptionKeyMap =
tryToExtractJsonEncryptionKeys();
+ if (encryptionKeyMap != null) {
+ popularProducerBuilderForClientSideEncrypt(builder,
encryptionKeyMap);
+ } else {
+ popularProducerBuilderForServerSideEncrypt(builder);
}
+ return builder;
+ }
- if (queryParams.containsKey("maxPendingMessages")) {
-
builder.maxPendingMessages(Integer.parseInt(queryParams.get("maxPendingMessages")));
+ private Map<String, EncryptionKey> tryToExtractJsonEncryptionKeys() {
+ if (!queryParams.containsKey("encryptionKeys")) {
+ return null;
+ }
+ // Base64 decode.
+ byte[] param = null;
+ try {
+ param =
Base64.getDecoder().decode(StringUtils.trim(queryParams.get("encryptionKeys")));
+ } catch (Exception base64DecodeEx) {
+ log.error("Could not base64 decode the param encryptionKeys",
base64DecodeEx);
+ throw new IllegalArgumentException("Could not base64 decode the
param encryptionKeys");
+ }
+ try {
+ Map<String, EncryptionKey> keys =
ObjectMapperFactory.getMapper().getObjectMapper()
+ .readValue(param, new TypeReference<Map<String,
EncryptionKey>>() {
+ });
+ if (keys.isEmpty()) {
+ return null;
+ }
+ if (keys.values().iterator().next().getKeyValue() == null) {
+ return null;
+ }
+ return keys;
+ } catch (IOException ex) {
+ return null;
}
+ }
- if (queryParams.containsKey("batchingMaxPublishDelay")) {
-
builder.batchingMaxPublishDelay(Integer.parseInt(queryParams.get("batchingMaxPublishDelay")),
- TimeUnit.MILLISECONDS);
+ private void
popularProducerBuilderForClientSideEncrypt(ProducerBuilder<byte[]> builder,
+ Map<String,
EncryptionKey> encryptionKeyMap) {
+ this.clientSideEncrypt = true;
+ int keysLen = encryptionKeyMap.size();
+ final String[] keyNameArray = new String[keysLen];
+ final byte[][] keyValueArray = new byte[keysLen][];
+ final List<KeyValue>[] keyMetadataArray = new List[keysLen];
+ // Format keys.
+ int index = 0;
+ for (Map.Entry<String, EncryptionKey> entry :
encryptionKeyMap.entrySet()) {
+ checkArgument(StringUtils.isNotBlank(entry.getKey()), "Empty param
encryptionKeys.key");
+ checkArgument(entry.getValue() != null, "Empty param
encryptionKeys.value");
+ checkArgument(entry.getValue().getKeyValue() != null, "Empty param
encryptionKeys.value.keyValue");
+ keyNameArray[index] = StringUtils.trim(entry.getKey());
+ keyValueArray[index] = entry.getValue().getKeyValue();
+ if (entry.getValue().getMetadata() == null) {
+ keyMetadataArray[index] = Collections.emptyList();
+ } else {
+ keyMetadataArray[index] =
entry.getValue().getMetadata().entrySet().stream()
+ .map(e -> new
KeyValue().setKey(e.getKey()).setValue(e.getValue()))
+ .collect(Collectors.toList());
+ }
+ builder.addEncryptionKey(keyNameArray[index]);
}
+ // Disable server-side batch process, compression, and encryption, and
only set the message metadata that
+ // which client-side provided.
+ builder.enableBatching(false);
+ builder.compressionType(CompressionType.NONE);
+ builder.cryptoKeyReader(DummyCryptoKeyReaderImpl.INSTANCE);
+ // Disable chunking.
+ builder.enableChunking(false);
+ // Inject encryption metadata decorator.
+ builder.messageCrypto(new WSSDummyMessageCryptoImpl(msgMetadata -> {
+ for (int i = 0; i < keyNameArray.length; i++) {
+
msgMetadata.addEncryptionKey().setKey(keyNameArray[i]).setValue(keyValueArray[i])
+ .addAllMetadatas(keyMetadataArray[i]);
+ }
+ }));
+ // Do warning param check and print warning log.
+ printWarnLogIfSettingBatchedParams();
+ printWarnLogIfSettingCompressionParams();
+ }
- if (queryParams.containsKey("messageRoutingMode")) {
- checkArgument(
- Enums.getIfPresent(MessageRoutingMode.class,
queryParams.get("messageRoutingMode")).isPresent(),
- "Invalid messageRoutingMode %s",
queryParams.get("messageRoutingMode"));
- MessageRoutingMode routingMode =
MessageRoutingMode.valueOf(queryParams.get("messageRoutingMode"));
- if (!MessageRoutingMode.CustomPartition.equals(routingMode)) {
- builder.messageRoutingMode(routingMode);
+ private void
popularProducerBuilderForServerSideEncrypt(ProducerBuilder<byte[]> builder) {
+ this.clientSideEncrypt = false;
+ if (queryParams.containsKey("batchingEnabled")) {
+ boolean batchingEnabled =
Boolean.parseBoolean(queryParams.get("batchingEnabled"));
+ if (batchingEnabled) {
+ builder.enableBatching(true);
+ if (queryParams.containsKey("batchingMaxMessages")) {
+
builder.batchingMaxMessages(Integer.parseInt(queryParams.get("batchingMaxMessages")));
+ }
+
+ if (queryParams.containsKey("maxPendingMessages")) {
+
builder.maxPendingMessages(Integer.parseInt(queryParams.get("maxPendingMessages")));
+ }
+
+ if (queryParams.containsKey("batchingMaxPublishDelay")) {
+
builder.batchingMaxPublishDelay(Integer.parseInt(queryParams.get("batchingMaxPublishDelay")),
+ TimeUnit.MILLISECONDS);
+ }
+ } else {
+ builder.enableBatching(false);
+ printWarnLogIfSettingBatchedParams();
Review Comment:
Should be removed?
##########
pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/WssClientSideEncryptUtils.java:
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.websocket.proxy;
+
+import static org.apache.pulsar.client.impl.crypto.MessageCryptoBc.ECDSA;
+import static org.apache.pulsar.client.impl.crypto.MessageCryptoBc.ECIES;
+import static org.apache.pulsar.client.impl.crypto.MessageCryptoBc.RSA;
+import static org.apache.pulsar.client.impl.crypto.MessageCryptoBc.RSA_TRANS;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import io.netty.buffer.ByteBuf;
+import io.netty.util.concurrent.FastThreadLocal;
+import java.io.IOException;
+import java.net.URLEncoder;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.security.InvalidAlgorithmParameterException;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+import java.security.NoSuchProviderException;
+import java.security.PublicKey;
+import java.security.spec.AlgorithmParameterSpec;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import javax.crypto.Cipher;
+import javax.crypto.NoSuchPaddingException;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.EncryptionKeyInfo;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.common.api.proto.CompressionType;
+import org.apache.pulsar.common.api.proto.EncryptionKeys;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
+import org.apache.pulsar.common.compression.CompressionCodec;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.websocket.data.ConsumerMessage;
+import org.apache.pulsar.websocket.data.ProducerMessage;
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+
+@Slf4j
+public class WssClientSideEncryptUtils {
+
+ public static Charset UTF8 = StandardCharsets.UTF_8;
+
+ public static String base64AndUrlEncode(String str) {
+ return base64AndUrlEncode(str.getBytes(UTF8), UTF8);
+ }
+
+ public static String base64AndUrlEncode(String str, Charset charset) {
+ return base64AndUrlEncode(str.getBytes(charset), charset);
+ }
+
+ public static String base64Encode(String str, Charset charset) {
+ return Base64.getEncoder().encodeToString(str.getBytes(charset));
+ }
+
+ public static String base64Encode(String str) {
+ return Base64.getEncoder().encodeToString(str.getBytes(UTF8));
+ }
+
+ public static byte[] base64Decode(String str) {
+ return Base64.getDecoder().decode(str);
+ }
+
+ public static String base64Encode(byte[] byteArray) {
+ return Base64.getEncoder().encodeToString(byteArray);
+ }
+
+ public static String base64AndUrlEncode(byte[] byteArray) {
+ String base64Encode = Base64.getEncoder().encodeToString(byteArray);
+ return URLEncoder.encode(base64Encode, UTF8);
+ }
+
+ public static String base64AndUrlEncode(byte[] byteArray, Charset charset)
{
+ String base64Encode = Base64.getEncoder().encodeToString(byteArray);
+ return URLEncoder.encode(base64Encode, charset);
+ }
+
+ public static String urlEncode(String str) {
+ return URLEncoder.encode(str, UTF8);
+ }
+
+ public static String urlEncode(String str, Charset charset) {
+ return URLEncoder.encode(str, charset);
+ }
+
+ public static byte[] calculateEncryptedKeyValue(MessageCryptoBc msgCrypto,
CryptoKeyReader cryptoKeyReader,
+ String publicKeyName)
+ throws PulsarClientException.CryptoException {
+ EncryptionKeyInfo encryptionKeyInfo =
cryptoKeyReader.getPublicKey(publicKeyName, Collections.emptyMap());
+ return calculateEncryptedKeyValue(msgCrypto,
encryptionKeyInfo.getKey());
+ }
+
+ public static String toJSONAndBase64AndUrlEncode(Object obj)
+ throws PulsarClientException.CryptoException {
+ try {
+ String json = ObjectMapperFactory.getMapper().getObjectMapper()
+ .writeValueAsString(obj);
+ return urlEncode(base64Encode(json));
+ } catch (JsonProcessingException e) {
+ throw new
PulsarClientException.CryptoException(String.format("Serialize object %s
failed", obj));
+ }
+ }
+
+ public static byte[] calculateEncryptedKeyValue(MessageCryptoBc msgCrypto,
EncryptionKeyInfo encryptionKeyInfo)
+ throws Exception {
+ return calculateEncryptedKeyValue(msgCrypto,
encryptionKeyInfo.getKey());
+ }
+
+ public static byte[] calculateEncryptedKeyValue(MessageCryptoBc msgCrypto,
byte[] publicKeyData)
+ throws PulsarClientException.CryptoException {
+ try {
+ PublicKey pubKey = MessageCryptoBc.loadPublicKey(publicKeyData);
+ Cipher dataKeyCipher = loadAndInitCipher(pubKey);
+ return dataKeyCipher.doFinal(msgCrypto.getDataKey().getEncoded());
+ } catch (Exception e) {
+ log.error("Failed to encrypt data key. {}", e.getMessage());
+ throw new PulsarClientException.CryptoException(e.getMessage());
+ }
+ }
+
+ private static Cipher loadAndInitCipher(PublicKey pubKey) throws
PulsarClientException.CryptoException,
+ NoSuchAlgorithmException, NoSuchProviderException,
NoSuchPaddingException, InvalidKeyException,
+ InvalidAlgorithmParameterException {
+ Cipher dataKeyCipher = null;
+ AlgorithmParameterSpec params = null;
+ // Encrypt data key using public key
+ if (RSA.equals(pubKey.getAlgorithm())) {
+ dataKeyCipher = Cipher.getInstance(RSA_TRANS,
BouncyCastleProvider.PROVIDER_NAME);
+ } else if (ECDSA.equals(pubKey.getAlgorithm())) {
+ dataKeyCipher = Cipher.getInstance(ECIES,
BouncyCastleProvider.PROVIDER_NAME);
+ params = MessageCryptoBc.createIESParameterSpec();
+ } else {
+ String msg = "Unsupported key type " + pubKey.getAlgorithm();
+ log.error(msg);
+ throw new PulsarClientException.CryptoException(msg);
+ }
+ if (params != null) {
+ dataKeyCipher.init(Cipher.ENCRYPT_MODE, pubKey, params);
+ } else {
+ dataKeyCipher.init(Cipher.ENCRYPT_MODE, pubKey);
+ }
+ return dataKeyCipher;
+ }
+
+ public static byte[] compressionIfNeeded(CompressionType compressionType,
byte[] payload) {
+ if (compressionType != null &&
!CompressionType.NONE.equals(compressionType)) {
+ CompressionCodec codec =
CompressionCodecProvider.getCompressionCodec(compressionType);
+ ByteBuf input =
PulsarByteBufAllocator.DEFAULT.buffer(payload.length, payload.length);
+ input.writeBytes(payload);
+ ByteBuf output = codec.encode(input);
+ input.release();
+ byte[] res = new byte[output.readableBytes()];
+ output.readBytes(res);
+ output.release();
+ return res;
+ }
+ return payload;
+ }
+
+ public static EncryptedPayloadAndParam encryptPayload(CryptoKeyReader
cryptoKeyReader, MessageCryptoBc msgCrypto,
+ byte[] payload,
String keyName)
+ throws PulsarClientException {
+ ByteBuffer unEncryptedMessagePayload = ByteBuffer.wrap(payload);
+ ByteBuffer encryptedMessagePayload =
ByteBuffer.allocate(unEncryptedMessagePayload.remaining() + 512);
+ MessageMetadata ignoredMessageMetadata = new MessageMetadata();
+ msgCrypto.encrypt(Collections.singleton(keyName), cryptoKeyReader,
+ () -> ignoredMessageMetadata, unEncryptedMessagePayload,
encryptedMessagePayload);
+ byte[] res = new byte[encryptedMessagePayload.remaining()];
+ encryptedMessagePayload.get(res);
+ return new
EncryptedPayloadAndParam(WssClientSideEncryptUtils.base64Encode(res),
+
WssClientSideEncryptUtils.base64Encode(ignoredMessageMetadata.getEncryptionParam()));
+ }
+
+ @AllArgsConstructor
+ public static class EncryptedPayloadAndParam {
+ public final String encryptedPayload;
+ public final String encryptionParam;
+ }
+
+ private static final FastThreadLocal<SingleMessageMetadata>
LOCAL_SINGLE_MESSAGE_METADATA =
+ new FastThreadLocal<>() {
+ @Override
+ protected SingleMessageMetadata initialValue() throws
Exception {
+ return new SingleMessageMetadata();
+ }
+ };
+
+ public static byte[] compositeBatchMessage(List<ProducerMessage> messages)
{
Review Comment:
We should remove it since it's no longer in use.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]