poorbarcode commented on code in PR #20958:
URL: https://github.com/apache/pulsar/pull/20958#discussion_r1301138803


##########
pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ProducerMessage.java:
##########
@@ -69,4 +70,13 @@ public class ProducerMessage {
 
     // Base64 encoded serialized schema for payload
     public String valueSchema;
+
+    // Compression type. Do not set it if compression is not performed.

Review Comment:
   Fixed



##########
pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ProducerMessage.java:
##########
@@ -69,4 +70,13 @@ public class ProducerMessage {
 
     // Base64 encoded serialized schema for payload
     public String valueSchema;
+
+    // Compression type. Do not set it if compression is not performed.
+    public String encryptionParam;
+
+    // The size of the payload before compression. Do not set it if 
compression is not performed.
+    public CompressionType compressionType;
+
+    // Base64 encoded serialized initialization vector used when the client 
encrypts.

Review Comment:
   Fixed



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -1739,7 +1739,7 @@ private ByteBuf decryptPayloadIfNeeded(MessageIdData 
messageId, int redeliveryCo
         if (conf.getCryptoKeyReader() == null) {
             switch (conf.getCryptoFailureAction()) {
                 case CONSUME:
-                    log.warn("[{}][{}][{}] CryptoKeyReader interface is not 
implemented. Consuming encrypted message.",
+                    log.info("[{}][{}][{}] CryptoKeyReader interface is not 
implemented. Consuming encrypted message.",

Review Comment:
   Done



##########
pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/WssClientSideEncryptUtils.java:
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.websocket.proxy;
+
+import static org.apache.pulsar.client.impl.crypto.MessageCryptoBc.ECDSA;
+import static org.apache.pulsar.client.impl.crypto.MessageCryptoBc.ECIES;
+import static org.apache.pulsar.client.impl.crypto.MessageCryptoBc.RSA;
+import static org.apache.pulsar.client.impl.crypto.MessageCryptoBc.RSA_TRANS;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import io.netty.buffer.ByteBuf;
+import io.netty.util.concurrent.FastThreadLocal;
+import java.io.IOException;
+import java.net.URLEncoder;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.security.InvalidAlgorithmParameterException;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+import java.security.NoSuchProviderException;
+import java.security.PublicKey;
+import java.security.spec.AlgorithmParameterSpec;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import javax.crypto.Cipher;
+import javax.crypto.NoSuchPaddingException;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.EncryptionKeyInfo;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.common.api.proto.CompressionType;
+import org.apache.pulsar.common.api.proto.EncryptionKeys;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
+import org.apache.pulsar.common.compression.CompressionCodec;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.websocket.data.ConsumerMessage;
+import org.apache.pulsar.websocket.data.ProducerMessage;
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+
+@Slf4j
+public class WssClientSideEncryptUtils {
+
+    public static Charset UTF8 = StandardCharsets.UTF_8;
+
+    public static String base64AndUrlEncode(String str) {
+        return base64AndUrlEncode(str.getBytes(UTF8), UTF8);
+    }
+
+    public static String base64AndUrlEncode(String str, Charset charset) {
+        return base64AndUrlEncode(str.getBytes(charset), charset);
+    }
+
+    public static String base64Encode(String str, Charset charset) {
+        return Base64.getEncoder().encodeToString(str.getBytes(charset));
+    }
+
+    public static String base64Encode(String str) {
+        return Base64.getEncoder().encodeToString(str.getBytes(UTF8));
+    }
+
+    public static byte[] base64Decode(String str) {
+        return Base64.getDecoder().decode(str);
+    }
+
+    public static String base64Encode(byte[] byteArray) {
+        return Base64.getEncoder().encodeToString(byteArray);
+    }
+
+    public static String base64AndUrlEncode(byte[] byteArray) {
+        String base64Encode = Base64.getEncoder().encodeToString(byteArray);
+        return URLEncoder.encode(base64Encode, UTF8);
+    }
+
+    public static String base64AndUrlEncode(byte[] byteArray, Charset charset) 
{
+        String base64Encode = Base64.getEncoder().encodeToString(byteArray);
+        return URLEncoder.encode(base64Encode, charset);
+    }
+
+    public static String urlEncode(String str) {
+        return URLEncoder.encode(str, UTF8);
+    }
+
+    public static String urlEncode(String str, Charset charset) {
+        return URLEncoder.encode(str, charset);
+    }
+
+    public static byte[] calculateEncryptedKeyValue(MessageCryptoBc msgCrypto, 
CryptoKeyReader cryptoKeyReader,
+                                                    String publicKeyName)
+            throws PulsarClientException.CryptoException {
+        EncryptionKeyInfo encryptionKeyInfo = 
cryptoKeyReader.getPublicKey(publicKeyName, Collections.emptyMap());
+        return calculateEncryptedKeyValue(msgCrypto, 
encryptionKeyInfo.getKey());
+    }
+
+    public static String toJSONAndBase64AndUrlEncode(Object obj)
+            throws PulsarClientException.CryptoException {
+        try {
+            String json = ObjectMapperFactory.getMapper().getObjectMapper()
+                    .writeValueAsString(obj);
+            return urlEncode(base64Encode(json));
+        } catch (JsonProcessingException e) {
+            throw new 
PulsarClientException.CryptoException(String.format("Serialize object %s 
failed", obj));
+        }
+    }
+
+    public static byte[] calculateEncryptedKeyValue(MessageCryptoBc msgCrypto, 
EncryptionKeyInfo encryptionKeyInfo)
+            throws Exception {
+        return calculateEncryptedKeyValue(msgCrypto, 
encryptionKeyInfo.getKey());
+    }
+
+    public static byte[] calculateEncryptedKeyValue(MessageCryptoBc msgCrypto, 
byte[] publicKeyData)
+            throws PulsarClientException.CryptoException {
+        try {
+            PublicKey pubKey = MessageCryptoBc.loadPublicKey(publicKeyData);
+            Cipher dataKeyCipher = loadAndInitCipher(pubKey);
+            return dataKeyCipher.doFinal(msgCrypto.getDataKey().getEncoded());
+        } catch (Exception e) {
+            log.error("Failed to encrypt data key. {}", e.getMessage());
+            throw new PulsarClientException.CryptoException(e.getMessage());
+        }
+    }
+
+    private static Cipher loadAndInitCipher(PublicKey pubKey) throws 
PulsarClientException.CryptoException,
+            NoSuchAlgorithmException, NoSuchProviderException, 
NoSuchPaddingException, InvalidKeyException,
+            InvalidAlgorithmParameterException {
+        Cipher dataKeyCipher = null;
+        AlgorithmParameterSpec params = null;
+        // Encrypt data key using public key
+        if (RSA.equals(pubKey.getAlgorithm())) {
+            dataKeyCipher = Cipher.getInstance(RSA_TRANS, 
BouncyCastleProvider.PROVIDER_NAME);
+        } else if (ECDSA.equals(pubKey.getAlgorithm())) {
+            dataKeyCipher = Cipher.getInstance(ECIES, 
BouncyCastleProvider.PROVIDER_NAME);
+            params = MessageCryptoBc.createIESParameterSpec();
+        } else {
+            String msg =  "Unsupported key type " + pubKey.getAlgorithm();
+            log.error(msg);
+            throw new PulsarClientException.CryptoException(msg);
+        }
+        if (params != null) {
+            dataKeyCipher.init(Cipher.ENCRYPT_MODE, pubKey, params);
+        } else {
+            dataKeyCipher.init(Cipher.ENCRYPT_MODE, pubKey);
+        }
+        return dataKeyCipher;
+    }
+
+    public static byte[] compressionIfNeeded(CompressionType compressionType, 
byte[] payload) {
+        if (compressionType != null && 
!CompressionType.NONE.equals(compressionType)) {
+            CompressionCodec codec = 
CompressionCodecProvider.getCompressionCodec(compressionType);
+            ByteBuf input = 
PulsarByteBufAllocator.DEFAULT.buffer(payload.length, payload.length);
+            input.writeBytes(payload);
+            ByteBuf output = codec.encode(input);
+            input.release();
+            byte[] res = new byte[output.readableBytes()];
+            output.readBytes(res);
+            output.release();
+            return res;
+        }
+        return payload;
+    }
+
+    public static EncryptedPayloadAndParam encryptPayload(CryptoKeyReader 
cryptoKeyReader, MessageCryptoBc msgCrypto,
+                                                           byte[] payload, 
String keyName)
+            throws PulsarClientException {
+        ByteBuffer unEncryptedMessagePayload = ByteBuffer.wrap(payload);
+        ByteBuffer encryptedMessagePayload = 
ByteBuffer.allocate(unEncryptedMessagePayload.remaining() + 512);
+        MessageMetadata ignoredMessageMetadata = new MessageMetadata();
+        msgCrypto.encrypt(Collections.singleton(keyName), cryptoKeyReader,
+                () -> ignoredMessageMetadata, unEncryptedMessagePayload, 
encryptedMessagePayload);
+        byte[] res = new byte[encryptedMessagePayload.remaining()];
+        encryptedMessagePayload.get(res);
+        return new 
EncryptedPayloadAndParam(WssClientSideEncryptUtils.base64Encode(res),
+                
WssClientSideEncryptUtils.base64Encode(ignoredMessageMetadata.getEncryptionParam()));
+    }
+
+    @AllArgsConstructor
+    public static class EncryptedPayloadAndParam {
+        public final String encryptedPayload;
+        public final String encryptionParam;
+    }
+
+    private static final FastThreadLocal<SingleMessageMetadata> 
LOCAL_SINGLE_MESSAGE_METADATA =
+            new FastThreadLocal<>() {
+                @Override
+                protected SingleMessageMetadata initialValue() throws 
Exception {
+                    return new SingleMessageMetadata();
+                }
+            };
+
+    public static byte[] compositeBatchMessage(List<ProducerMessage> messages) 
{

Review Comment:
   It was removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to