This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 07eef59e16b [feat][ws] PIP-290 Make WSS support E2E encryption (#20958)
07eef59e16b is described below

commit 07eef59e16bca11788c2ec63c5f61d3bdcadb10d
Author: fengyubiao <[email protected]>
AuthorDate: Fri Aug 25 15:57:34 2023 +0800

    [feat][ws] PIP-290 Make WSS support E2E encryption (#20958)
    
    See PIP: https://github.com/apache/pulsar/pull/20923
---
 .../proxy/ClientSideEncryptionWssConsumer.java     | 154 +++++++++
 .../proxy/ClientSideEncryptionWssProducer.java     | 200 ++++++++++++
 .../websocket/proxy/CryptoKeyReaderForTest.java    |  77 +++++
 ...roxyPublishConsumeClientSideEncryptionTest.java | 360 +++++++++++++++++++++
 .../websocket/proxy/WssClientSideEncryptUtils.java | 280 ++++++++++++++++
 .../client/api/DummyCryptoKeyReaderImpl.java       |  41 +++
 .../pulsar/client/impl/crypto/MessageCryptoBc.java |  16 +-
 .../apache/pulsar/client/impl/ConsumerImpl.java    |   2 +-
 .../apache/pulsar/client/impl/ProducerImpl.java    |  11 +-
 .../apache/pulsar/websocket/ConsumerHandler.java   |   2 +
 .../apache/pulsar/websocket/ProducerHandler.java   | 205 ++++++++++--
 .../pulsar/websocket/data/ProducerMessage.java     |  10 +
 .../service/WSSDummyMessageCryptoImpl.java         |  77 +++++
 .../pulsar/websocket/ProducerHandlerTest.java      |   3 +-
 14 files changed, 1404 insertions(+), 34 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ClientSideEncryptionWssConsumer.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ClientSideEncryptionWssConsumer.java
new file mode 100644
index 00000000000..ab8372cbfb0
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ClientSideEncryptionWssConsumer.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.websocket.proxy;
+
+import static org.testng.Assert.assertTrue;
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.websocket.data.ConsumerMessage;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.WebSocketAdapter;
+import org.eclipse.jetty.websocket.api.annotations.WebSocket;
+import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
+import org.eclipse.jetty.websocket.client.WebSocketClient;
+
+@Slf4j
+@WebSocket(maxTextMessageSize = 64 * 1024)
+public class ClientSideEncryptionWssConsumer extends WebSocketAdapter 
implements Closeable {
+
+    private Session session;
+    private final CryptoKeyReader cryptoKeyReader;
+    private final String topicName;
+    private final String subscriptionName;
+    private final SubscriptionType subscriptionType;
+    private final String webSocketProxyHost;
+    private final int webSocketProxyPort;
+    private WebSocketClient wssClient;
+    private final MessageCryptoBc msgCrypto;
+    private final LinkedBlockingQueue<ConsumerMessage> incomingMessages = new 
LinkedBlockingQueue<>();
+
+    public ClientSideEncryptionWssConsumer(String webSocketProxyHost, int 
webSocketProxyPort, String topicName,
+                                           String subscriptionName, 
SubscriptionType subscriptionType,
+                                           CryptoKeyReader cryptoKeyReader) {
+        this.webSocketProxyHost = webSocketProxyHost;
+        this.webSocketProxyPort = webSocketProxyPort;
+        this.topicName = topicName;
+        this.subscriptionName = subscriptionName;
+        this.subscriptionType = subscriptionType;
+        this.msgCrypto = new MessageCryptoBc("[" + topicName + "] [" + 
subscriptionName + "]", false);
+        this.cryptoKeyReader = cryptoKeyReader;
+    }
+
+    public void start() throws Exception {
+        wssClient = new WebSocketClient();
+        wssClient.start();
+        session = wssClient.connect(this, buildConnectURL(), new 
ClientUpgradeRequest()).get();
+        assertTrue(session.isOpen());
+    }
+
+    private URI buildConnectURL() throws PulsarClientException.CryptoException 
{
+        final String protocolAndHostPort = "ws://" + webSocketProxyHost + ":" 
+ webSocketProxyPort;
+
+        // Build the URL for producer.
+        final StringBuilder consumerUri = new 
StringBuilder(protocolAndHostPort)
+                .append("/ws/v2/consumer/persistent/")
+                .append(topicName)
+                .append("/")
+                .append(subscriptionName)
+                .append("?")
+                
.append("subscriptionType=").append(subscriptionType.toString())
+                .append("&").append("cryptoFailureAction=CONSUME");
+        return URI.create(consumerUri.toString());
+    }
+
+    public synchronized ConsumerMessage receive(int timeout, TimeUnit unit) 
throws Exception {
+        ConsumerMessage msg = incomingMessages.poll(timeout, unit);
+        return msg;
+    }
+
+    @Override
+    public void onWebSocketClose(int statusCode, String reason) {
+        log.info("Connection closed: {} - {}", statusCode, reason);
+        this.session = null;
+    }
+
+    @Override
+    public void onWebSocketConnect(Session session) {
+        log.info("Got connect: {}", session);
+        this.session = session;
+    }
+
+    @Override
+    public void onWebSocketError(Throwable cause) {
+        log.error("Received an error", cause);
+    }
+
+    @Override
+    public void onWebSocketText(String text) {
+
+        try {
+            ConsumerMessage msg =
+                    ObjectMapperFactory.getMapper().reader().readValue(text, 
ConsumerMessage.class);
+            if (msg.messageId == null) {
+                log.error("Consumer[{}-{}] Could not extract the response 
payload: {}", topicName, subscriptionName,
+                        text);
+                return;
+            }
+            // Decrypt.
+            byte[] decryptedPayload = 
WssClientSideEncryptUtils.decryptMsgPayload(msg.payload, msg.encryptionContext,
+                    cryptoKeyReader, msgCrypto);
+            // Un-compression if needed.
+            byte[] unCompressedPayload = 
WssClientSideEncryptUtils.unCompressionIfNeeded(decryptedPayload,
+                    msg.encryptionContext);
+            // Extract batch messages if needed.
+            if (msg.encryptionContext.getBatchSize().isPresent()) {
+                List<ConsumerMessage> singleMsgs = 
WssClientSideEncryptUtils.extractBatchMessagesIfNeeded(
+                        unCompressedPayload, msg.encryptionContext);
+                for (ConsumerMessage singleMsg : singleMsgs) {
+                    incomingMessages.add(singleMsg);
+                }
+            } else {
+                msg.payload = new String(unCompressedPayload, 
StandardCharsets.UTF_8);
+                incomingMessages.add(msg);
+            }
+        } catch (Exception ex) {
+            log.error("Consumer[{}-{}] Could not extract the response payload: 
{}", topicName, subscriptionName, text);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        try {
+            wssClient.stop();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ClientSideEncryptionWssProducer.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ClientSideEncryptionWssProducer.java
new file mode 100644
index 00000000000..f9ae6cd4344
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ClientSideEncryptionWssProducer.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.websocket.proxy;
+
+import static org.testng.Assert.assertTrue;
+import static org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
+import static 
org.apache.pulsar.websocket.proxy.WssClientSideEncryptUtils.EncryptedPayloadAndParam;
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
+import org.apache.pulsar.common.api.proto.CompressionType;
+import org.apache.pulsar.common.api.proto.MessageIdData;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.websocket.data.ProducerMessage;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.WebSocketAdapter;
+import org.eclipse.jetty.websocket.api.annotations.WebSocket;
+import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
+import org.eclipse.jetty.websocket.client.WebSocketClient;
+
+@Slf4j
+@WebSocket(maxTextMessageSize = 64 * 1024)
+public class ClientSideEncryptionWssProducer extends WebSocketAdapter 
implements Closeable {
+
+    private Session session;
+    private volatile CompletableFuture<MessageIdData> sendFuture;
+    private final ScheduledExecutorService executor;
+    private final CryptoKeyReader cryptoKeyReader;
+    private final String topicName;
+    private final String producerName;
+    private final String webSocketProxyHost;
+    private final int webSocketProxyPort;
+    private final String keyName;
+    private WebSocketClient wssClient;
+    private final MessageCryptoBc msgCrypto;
+
+    public ClientSideEncryptionWssProducer(String webSocketProxyHost, int 
webSocketProxyPort, String topicName,
+                                           String producerName, 
CryptoKeyReader cryptoKeyReader, String keyName,
+                                           ScheduledExecutorService executor) {
+        this.webSocketProxyHost = webSocketProxyHost;
+        this.webSocketProxyPort = webSocketProxyPort;
+        this.topicName = topicName;
+        this.producerName = producerName;
+        this.msgCrypto = new MessageCryptoBc("[" + topicName + "] [" + 
producerName + "]", true);
+        this.cryptoKeyReader = cryptoKeyReader;
+        this.keyName = keyName;
+        this.executor = executor;
+    }
+
+    public void start() throws Exception {
+        wssClient = new WebSocketClient();
+        wssClient.start();
+        session = wssClient.connect(this, buildConnectURL(), new 
ClientUpgradeRequest()).get();
+        assertTrue(session.isOpen());
+    }
+
+    private URI buildConnectURL() throws PulsarClientException.CryptoException 
{
+        final String protocolAndHostPort = "ws://" + webSocketProxyHost + ":" 
+ webSocketProxyPort;
+
+        // Encode encrypted public key data.
+        final byte[] keyValue = 
WssClientSideEncryptUtils.calculateEncryptedKeyValue(msgCrypto, cryptoKeyReader,
+                keyName);
+        EncryptionKey encryptionKey = new EncryptionKey();
+        encryptionKey.setKeyValue(keyValue);
+        encryptionKey.setMetadata(cryptoKeyReader.getPublicKey(keyName, 
Collections.emptyMap()).getMetadata());
+        Map<String, EncryptionKey> encryptionKeyMap = new HashMap<>();
+        encryptionKeyMap.put(keyName, encryptionKey);
+
+        final String encryptionKeys =
+                
WssClientSideEncryptUtils.toJSONAndBase64AndUrlEncode(encryptionKeyMap);
+
+        // Build the URL for producer.
+        final StringBuilder producerUrL = new 
StringBuilder(protocolAndHostPort)
+                .append("/ws/v2/producer/persistent/")
+                .append(topicName)
+                .append("?")
+                .append("encryptionKeys=").append(encryptionKeys);
+        return URI.create(producerUrL.toString());
+    }
+
+    public synchronized MessageIdData sendMessage(ProducerMessage msg) throws 
Exception {
+        if (sendFuture != null && !sendFuture.isDone() && 
!sendFuture.isCancelled()) {
+            throw new IllegalArgumentException("There is a message still in 
sending.");
+        }
+        if (msg.payload == null) {
+            throw new IllegalArgumentException("Null value message is not 
supported.");
+        }
+        // Compression.
+        byte[] unCompressedPayload = 
msg.payload.getBytes(StandardCharsets.UTF_8);
+        byte[] compressedPayload = 
WssClientSideEncryptUtils.compressionIfNeeded(msg.compressionType,
+                unCompressedPayload);
+        if (msg.compressionType != null && 
!CompressionType.NONE.equals(msg.compressionType)) {
+            msg.uncompressedMessageSize = unCompressedPayload.length;
+        }
+        // Encrypt.
+        EncryptedPayloadAndParam encryptedPayloadAndParam = 
WssClientSideEncryptUtils.encryptPayload(
+                cryptoKeyReader, msgCrypto, compressedPayload, keyName);
+        msg.payload = encryptedPayloadAndParam.encryptedPayload;
+        msg.encryptionParam = encryptedPayloadAndParam.encryptionParam;
+        // Do send.
+        sendFuture = new CompletableFuture<>();
+        String jsonMsg = 
ObjectMapperFactory.getMapper().writer().writeValueAsString(msg);
+        this.session.getRemote().sendString(jsonMsg);
+        // Wait for response.
+        executor.schedule(() -> {
+            synchronized (ClientSideEncryptionWssProducer.this) {
+                if (!sendFuture.isDone() && !sendFuture.isCancelled()) {
+                    sendFuture.completeExceptionally(new 
TimeoutException("Send timeout"));
+                }
+            }
+        }, 50, TimeUnit.SECONDS);
+        return sendFuture.get();
+    }
+
+    @Override
+    public void onWebSocketClose(int statusCode, String reason) {
+        log.info("Connection closed: {} - {}", statusCode, reason);
+        this.session = null;
+        if (!sendFuture.isDone() && !sendFuture.isCancelled()) {
+            sendFuture.completeExceptionally(new RuntimeException("Connection 
was closed"));
+        }
+    }
+
+    @Override
+    public void onWebSocketConnect(Session session) {
+        log.info("Got connect: {}", session);
+        this.session = session;
+    }
+
+    @Override
+    public void onWebSocketError(Throwable cause) {
+        log.error("Received an error", cause);
+    }
+
+    @Override
+    public void onWebSocketText(String text) {
+        try {
+            ResponseOfSend responseOfSend =
+                    ObjectMapperFactory.getMapper().reader().readValue(text, 
ResponseOfSend.class);
+            if (responseOfSend.getErrorCode() != 0 || 
responseOfSend.getErrorMsg() != null) {
+                sendFuture.completeExceptionally(new RuntimeException(text));
+            } else {
+                byte[] bytes = 
Base64.getDecoder().decode(responseOfSend.getMessageId());
+                MessageIdData messageIdData = new MessageIdData();
+                messageIdData.parseFrom(bytes);
+                sendFuture.complete(messageIdData);
+            }
+        } catch (Exception ex) {
+            log.error("Could not extract the response payload: {}", text);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        try {
+            wssClient.stop();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Data
+    public static class ResponseOfSend {
+        private String result;
+        private String messageId;
+        private String errorMsg;
+        private int errorCode = -1;
+        private int schemaVersion;
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/CryptoKeyReaderForTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/CryptoKeyReaderForTest.java
new file mode 100644
index 00000000000..ff9cde766df
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/CryptoKeyReaderForTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.websocket.proxy;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.EncryptionKeyInfo;
+import org.testng.Assert;
+
+public class CryptoKeyReaderForTest implements CryptoKeyReader {
+
+    public static final Map<String, String> RANDOM_METADATA = new HashMap<>();
+
+    static {
+        RANDOM_METADATA.put(UUID.randomUUID().toString(), 
UUID.randomUUID().toString());
+        RANDOM_METADATA.put(UUID.randomUUID().toString(), 
UUID.randomUUID().toString());
+        RANDOM_METADATA.put(UUID.randomUUID().toString(), 
UUID.randomUUID().toString());
+    }
+
+    @Override
+    public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> 
metadata) {
+        EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+        String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." 
+ keyName;
+        if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+            try {
+                keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                // The metadata is meaningless, just to test that it can be 
transferred to the consumer.
+                keyInfo.setMetadata(RANDOM_METADATA);
+                return keyInfo;
+            } catch (IOException e) {
+                Assert.fail("Failed to read certificate from " + 
CERT_FILE_PATH);
+            }
+        } else {
+            Assert.fail("Certificate file " + CERT_FILE_PATH + " is not 
present or not readable.");
+        }
+        return null;
+    }
+
+    @Override
+    public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> 
metadata) {
+        EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+        String CERT_FILE_PATH = 
"./src/test/resources/certificate/private-key." + keyName;
+        if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+            try {
+                keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                keyInfo.setMetadata(RANDOM_METADATA);
+                return keyInfo;
+            } catch (IOException e) {
+                Assert.fail("Failed to read certificate from " + 
CERT_FILE_PATH);
+            }
+        } else {
+            Assert.fail("Certificate file " + CERT_FILE_PATH + " is not 
present or not readable.");
+        }
+        return null;
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeClientSideEncryptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeClientSideEncryptionTest.java
new file mode 100644
index 00000000000..ad69df4757f
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeClientSideEncryptionTest.java
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.websocket.proxy;
+
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.api.proto.CompressionType;
+import org.apache.pulsar.common.api.proto.MessageIdData;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.apache.pulsar.websocket.WebSocketService;
+import org.apache.pulsar.websocket.data.ConsumerMessage;
+import org.apache.pulsar.websocket.data.ProducerMessage;
+import org.apache.pulsar.websocket.service.ProxyServer;
+import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration;
+import org.apache.pulsar.websocket.service.WebSocketServiceStarter;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "websocket")
+public class ProxyPublishConsumeClientSideEncryptionTest extends 
ProducerConsumerBase {
+    private static final int TIME_TO_CHECK_BACKLOG_QUOTA = 5;
+    private static final ScheduledExecutorService executor = 
Executors.newScheduledThreadPool(1);
+    private static final Charset charset = Charset.defaultCharset();
+
+    private ProxyServer proxyServer;
+    private WebSocketService service;
+
+    @BeforeClass
+    public void setup() throws Exception {
+        
conf.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
+
+        super.internalSetup();
+        super.producerBaseSetup();
+
+        WebSocketProxyConfiguration config = new WebSocketProxyConfiguration();
+        config.setWebServicePort(Optional.of(0));
+        config.setClusterName("test");
+        config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
+        WebSocketService service = spy(new WebSocketService(config));
+        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+                .createConfigMetadataStore(anyString(), anyInt(), 
anyBoolean());
+        proxyServer = new ProxyServer(config);
+        WebSocketServiceStarter.start(proxyServer, service);
+        log.info("Proxy Server Started");
+    }
+
+    @AfterClass(alwaysRun = true)
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+        if (service != null) {
+            service.close();
+        }
+        if (proxyServer != null) {
+            proxyServer.stop();
+        }
+        log.info("Finished Cleaning Up Test setup");
+    }
+
+    @DataProvider(name = "encryptKeyNames")
+    public Object[][] encryptKeyNames() {
+        return new Object[][]{
+            {"client-ecdsa.pem"},
+            {"client-rsa.pem"}
+        };
+    }
+
+    @Test(dataProvider = "encryptKeyNames")
+    public void testWssSendAndJavaConsumeWithEncryption(String keyName) throws 
Exception {
+        final String topicName = 
BrokerTestUtil.newUniqueName("public/default/tp_");
+        final String subscriptionName = "s1";
+        final String producerName = "wss-p1";
+        admin.topics().createNonPartitionedTopic(topicName);
+        admin.topics().createSubscription(topicName, subscriptionName, 
MessageId.earliest);
+
+        // Create wss producer.
+        final String webSocketProxyHost = "localhost";
+        final int webSocketProxyPort = proxyServer.getListenPortHTTP().get();
+        final CryptoKeyReader cryptoKeyReader = new CryptoKeyReaderForTest();
+        ClientSideEncryptionWssProducer producer = new 
ClientSideEncryptionWssProducer(webSocketProxyHost,
+                webSocketProxyPort, topicName, producerName, cryptoKeyReader, 
keyName, executor);
+        producer.start();
+
+        // Send message.
+        String msgPayloadBeforeEncrypt = "msg-123";
+        ProducerMessage messageSent = new ProducerMessage();
+        messageSent.key = "k";
+        messageSent.payload = msgPayloadBeforeEncrypt;
+        MessageIdData messageIdData = producer.sendMessage(messageSent);
+        log.info("send success: {}", messageIdData.toString());
+
+        // Consume.
+        Consumer consumer = 
pulsarClient.newConsumer().cryptoKeyReader(cryptoKeyReader)
+                .topic(topicName).subscriptionName(subscriptionName)
+                .subscribe();
+        Message msgReceived = consumer.receive(2, TimeUnit.SECONDS);
+        assertEquals(new String(msgReceived.getData(), charset), 
msgPayloadBeforeEncrypt);
+
+        // cleanup.
+        producer.close();
+        consumer.close();
+        admin.topics().delete(topicName);
+    }
+
+    @DataProvider(name = "compressionTypes")
+    public Object[][] compressionTypes() {
+        return new Object[][]{
+                {CompressionType.NONE},
+                {CompressionType.LZ4},
+                {CompressionType.ZLIB},
+                {CompressionType.SNAPPY},
+                {CompressionType.ZSTD}
+        };
+    }
+
+    @Test(dataProvider = "compressionTypes")
+    public void 
testWssSendAndJavaConsumeWithEncryptionAndCompression(CompressionType 
compressionType)
+            throws Exception {
+        final String topicName = 
BrokerTestUtil.newUniqueName("public/default/tp_");
+        final String subscriptionName = "s1";
+        final String producerName = "wss-p1";
+        final String keyName = "client-ecdsa.pem";
+        admin.topics().createNonPartitionedTopic(topicName);
+        admin.topics().createSubscription(topicName, subscriptionName, 
MessageId.earliest);
+
+        // Create wss producer.
+        final String webSocketProxyHost = "localhost";
+        final int webSocketProxyPort = proxyServer.getListenPortHTTP().get();
+        final CryptoKeyReader cryptoKeyReader = new CryptoKeyReaderForTest();
+        ClientSideEncryptionWssProducer producer = new 
ClientSideEncryptionWssProducer(webSocketProxyHost,
+                webSocketProxyPort, topicName, producerName, cryptoKeyReader, 
keyName, executor);
+        producer.start();
+
+        // Send message.
+        String originalPayload = "msg-123";
+        ProducerMessage messageSent = new ProducerMessage();
+        messageSent.key = "k";
+        messageSent.payload = originalPayload;
+        messageSent.compressionType = compressionType;
+        MessageIdData messageIdData = producer.sendMessage(messageSent);
+        log.info("send success: {}", messageIdData.toString());
+
+        // Consume.
+        Consumer consumer = 
pulsarClient.newConsumer().cryptoKeyReader(cryptoKeyReader)
+                .topic(topicName).subscriptionName(subscriptionName)
+                .subscribe();
+        Message msgReceived = consumer.receive(2, TimeUnit.SECONDS);
+        assertEquals(new String(msgReceived.getData(), charset), 
originalPayload);
+
+        // cleanup.
+        producer.close();
+        consumer.close();
+        admin.topics().delete(topicName);
+    }
+
+    @Test(dataProvider = "encryptKeyNames")
+    public void testJavaSendAndWssConsumeWithEncryption(String keyName) throws 
Exception {
+        final String topicName = 
BrokerTestUtil.newUniqueName("public/default/tp_");
+        final String subscriptionName = "s1";
+        admin.topics().createNonPartitionedTopic(topicName);
+        admin.topics().createSubscription(topicName, subscriptionName, 
MessageId.earliest);
+        final CryptoKeyReader cryptoKeyReader = new CryptoKeyReaderForTest();
+
+        final String originalPayload = "msg-123";
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName).addEncryptionKey(keyName)
+                .cryptoKeyReader(cryptoKeyReader).create();
+        producer.send(originalPayload.getBytes(StandardCharsets.UTF_8));
+
+        // Create wss consumer.
+        final String webSocketProxyHost = "localhost";
+        final int webSocketProxyPort = proxyServer.getListenPortHTTP().get();
+
+        ClientSideEncryptionWssConsumer consumer = new 
ClientSideEncryptionWssConsumer(webSocketProxyHost,
+                webSocketProxyPort, topicName, subscriptionName, 
SubscriptionType.Shared, cryptoKeyReader);
+        consumer.start();
+
+        // Receive message.
+        ConsumerMessage message = consumer.receive(2, TimeUnit.SECONDS);
+        assertEquals(message.payload, originalPayload);
+
+        // cleanup.
+        producer.close();
+        consumer.close();
+        admin.topics().delete(topicName);
+    }
+
+    @DataProvider(name = "compressionTypesForJ")
+    public Object[][] compressionTypesForJ() {
+        return new Object[][]{
+                {org.apache.pulsar.client.api.CompressionType.NONE},
+                {org.apache.pulsar.client.api.CompressionType.LZ4},
+                {org.apache.pulsar.client.api.CompressionType.ZLIB},
+                {org.apache.pulsar.client.api.CompressionType.SNAPPY},
+                {org.apache.pulsar.client.api.CompressionType.ZSTD}
+        };
+    }
+
+    @Test(dataProvider = "compressionTypesForJ")
+    public void 
testJavaSendAndWssConsumeWithEncryptionAndCompression(org.apache.pulsar.client.api.CompressionType
+                                                                               
   compressionType)
+                                                                        throws 
Exception {
+        final String topicName = 
BrokerTestUtil.newUniqueName("public/default/tp_");
+        final String subscriptionName = "s1";
+        admin.topics().createNonPartitionedTopic(topicName);
+        admin.topics().createSubscription(topicName, subscriptionName, 
MessageId.earliest);
+        final CryptoKeyReader cryptoKeyReader = new CryptoKeyReaderForTest();
+        final String keyName = "client-ecdsa.pem";
+
+        final String originalPayload = "msg-123";
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                
.topic(topicName).addEncryptionKey(keyName).compressionType(compressionType)
+                .cryptoKeyReader(cryptoKeyReader).create();
+        producer.send(originalPayload.getBytes(StandardCharsets.UTF_8));
+
+        // Create wss consumer.
+        final String webSocketProxyHost = "localhost";
+        final int webSocketProxyPort = proxyServer.getListenPortHTTP().get();
+
+        ClientSideEncryptionWssConsumer consumer = new 
ClientSideEncryptionWssConsumer(webSocketProxyHost,
+                webSocketProxyPort, topicName, subscriptionName, 
SubscriptionType.Shared, cryptoKeyReader);
+        consumer.start();
+
+        // Receive message.
+        ConsumerMessage message = consumer.receive(2, TimeUnit.SECONDS);
+        assertEquals(message.payload, originalPayload);
+
+        // cleanup.
+        producer.close();
+        consumer.close();
+        admin.topics().delete(topicName);
+    }
+
+    @Test
+    public void 
testJavaSendAndWssConsumeWithEncryptionAndCompressionAndBatch() throws 
Exception {
+        final String topicName = 
BrokerTestUtil.newUniqueName("public/default/tp_");
+        final String subscriptionName = "s1";
+        admin.topics().createNonPartitionedTopic(topicName);
+        admin.topics().createSubscription(topicName, subscriptionName, 
MessageId.earliest);
+        final CryptoKeyReader cryptoKeyReader = new CryptoKeyReaderForTest();
+        final String keyName = "client-ecdsa.pem";
+
+        final HashSet<String> messagesSent = new HashSet<>();
+        Producer<byte[]> producer = 
pulsarClient.newProducer().enableBatching(true)
+                .batchingMaxMessages(1000)
+                .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+                .topic(topicName).addEncryptionKey(keyName)
+                
.compressionType(org.apache.pulsar.client.api.CompressionType.LZ4)
+                .cryptoKeyReader(cryptoKeyReader).create();
+        for (int i = 0; i < 10; i++) {
+            String payload = "msg-" + i;
+            messagesSent.add(payload);
+            producer.sendAsync(payload.getBytes(StandardCharsets.UTF_8));
+        }
+        producer.flush();
+
+        // Create wss consumer.
+        final String webSocketProxyHost = "localhost";
+        final int webSocketProxyPort = proxyServer.getListenPortHTTP().get();
+
+        ClientSideEncryptionWssConsumer consumer = new 
ClientSideEncryptionWssConsumer(webSocketProxyHost,
+                webSocketProxyPort, topicName, subscriptionName, 
SubscriptionType.Shared, cryptoKeyReader);
+        consumer.start();
+
+        // Receive message.
+        final HashSet<String> messagesReceived = new HashSet<>();
+        while (true) {
+            ConsumerMessage message = consumer.receive(2, TimeUnit.SECONDS);
+            if (message == null) {
+                break;
+            }
+            messagesReceived.add(message.payload);
+        }
+        assertEquals(messagesReceived, messagesSent);
+
+        // cleanup.
+        producer.close();
+        consumer.close();
+        admin.topics().delete(topicName);
+    }
+
+
+
+    @Test(dataProvider = "encryptKeyNames")
+    public void testWssSendAndWssConsumeWithEncryption(String keyName) throws 
Exception {
+        final String topicName = 
BrokerTestUtil.newUniqueName("public/default/tp_");
+        final String subscriptionName = "s1";
+        final String producerName = "wss-p1";
+        admin.topics().createNonPartitionedTopic(topicName);
+        admin.topics().createSubscription(topicName, subscriptionName, 
MessageId.earliest);
+
+        // Create wss producer.
+        final String webSocketProxyHost = "localhost";
+        final int webSocketProxyPort = proxyServer.getListenPortHTTP().get();
+        final CryptoKeyReader cryptoKeyReader = new CryptoKeyReaderForTest();
+        ClientSideEncryptionWssProducer producer = new 
ClientSideEncryptionWssProducer(webSocketProxyHost,
+                webSocketProxyPort, topicName, producerName, cryptoKeyReader, 
keyName, executor);
+        producer.start();
+
+        // Send message.
+        String msgPayloadBeforeEncrypt = "msg-123";
+        ProducerMessage messageSent = new ProducerMessage();
+        messageSent.key = "k";
+        messageSent.payload = msgPayloadBeforeEncrypt;
+        MessageIdData messageIdData = producer.sendMessage(messageSent);
+        log.info("send success: {}", messageIdData.toString());
+
+        // Consume.
+        ClientSideEncryptionWssConsumer consumer = new 
ClientSideEncryptionWssConsumer(webSocketProxyHost,
+                webSocketProxyPort, topicName, subscriptionName, 
SubscriptionType.Shared, cryptoKeyReader);
+        consumer.start();
+        ConsumerMessage msgReceived = consumer.receive(2, TimeUnit.SECONDS);
+        assertEquals(msgReceived.payload, msgPayloadBeforeEncrypt);
+        
assertEquals(msgReceived.encryptionContext.getKeys().get(keyName).getMetadata(),
+                CryptoKeyReaderForTest.RANDOM_METADATA);
+
+        // cleanup.
+        producer.close();
+        consumer.close();
+        admin.topics().delete(topicName);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/WssClientSideEncryptUtils.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/WssClientSideEncryptUtils.java
new file mode 100644
index 00000000000..dbbfb756625
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/WssClientSideEncryptUtils.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.websocket.proxy;
+
+import static org.apache.pulsar.client.impl.crypto.MessageCryptoBc.ECDSA;
+import static org.apache.pulsar.client.impl.crypto.MessageCryptoBc.ECIES;
+import static org.apache.pulsar.client.impl.crypto.MessageCryptoBc.RSA;
+import static org.apache.pulsar.client.impl.crypto.MessageCryptoBc.RSA_TRANS;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import io.netty.buffer.ByteBuf;
+import java.io.IOException;
+import java.net.URLEncoder;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.security.InvalidAlgorithmParameterException;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+import java.security.NoSuchProviderException;
+import java.security.PublicKey;
+import java.security.spec.AlgorithmParameterSpec;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import javax.crypto.Cipher;
+import javax.crypto.NoSuchPaddingException;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.EncryptionKeyInfo;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.common.api.proto.CompressionType;
+import org.apache.pulsar.common.api.proto.EncryptionKeys;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
+import org.apache.pulsar.common.compression.CompressionCodec;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.websocket.data.ConsumerMessage;
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+
+@Slf4j
+public class WssClientSideEncryptUtils {
+
+    public static Charset UTF8 = StandardCharsets.UTF_8;
+
+    public static String base64AndUrlEncode(String str) {
+        return base64AndUrlEncode(str.getBytes(UTF8), UTF8);
+    }
+
+    public static String base64AndUrlEncode(String str, Charset charset) {
+        return base64AndUrlEncode(str.getBytes(charset), charset);
+    }
+
+    public static String base64Encode(String str, Charset charset) {
+        return Base64.getEncoder().encodeToString(str.getBytes(charset));
+    }
+
+    public static String base64Encode(String str) {
+        return Base64.getEncoder().encodeToString(str.getBytes(UTF8));
+    }
+
+    public static byte[] base64Decode(String str) {
+        return Base64.getDecoder().decode(str);
+    }
+
+    public static String base64Encode(byte[] byteArray) {
+        return Base64.getEncoder().encodeToString(byteArray);
+    }
+
+    public static String base64AndUrlEncode(byte[] byteArray) {
+        String base64Encode = Base64.getEncoder().encodeToString(byteArray);
+        return URLEncoder.encode(base64Encode, UTF8);
+    }
+
+    public static String base64AndUrlEncode(byte[] byteArray, Charset charset) 
{
+        String base64Encode = Base64.getEncoder().encodeToString(byteArray);
+        return URLEncoder.encode(base64Encode, charset);
+    }
+
+    public static String urlEncode(String str) {
+        return URLEncoder.encode(str, UTF8);
+    }
+
+    public static String urlEncode(String str, Charset charset) {
+        return URLEncoder.encode(str, charset);
+    }
+
+    public static byte[] calculateEncryptedKeyValue(MessageCryptoBc msgCrypto, 
CryptoKeyReader cryptoKeyReader,
+                                                    String publicKeyName)
+            throws PulsarClientException.CryptoException {
+        EncryptionKeyInfo encryptionKeyInfo = 
cryptoKeyReader.getPublicKey(publicKeyName, Collections.emptyMap());
+        return calculateEncryptedKeyValue(msgCrypto, 
encryptionKeyInfo.getKey());
+    }
+
+    public static String toJSONAndBase64AndUrlEncode(Object obj)
+            throws PulsarClientException.CryptoException {
+        try {
+            String json = ObjectMapperFactory.getMapper().getObjectMapper()
+                    .writeValueAsString(obj);
+            return urlEncode(base64Encode(json));
+        } catch (JsonProcessingException e) {
+            throw new 
PulsarClientException.CryptoException(String.format("Serialize object %s 
failed", obj));
+        }
+    }
+
+    public static byte[] calculateEncryptedKeyValue(MessageCryptoBc msgCrypto, 
byte[] publicKeyData)
+            throws PulsarClientException.CryptoException {
+        try {
+            PublicKey pubKey = MessageCryptoBc.loadPublicKey(publicKeyData);
+            Cipher dataKeyCipher = loadAndInitCipher(pubKey);
+            return dataKeyCipher.doFinal(msgCrypto.getDataKey().getEncoded());
+        } catch (Exception e) {
+            log.error("Failed to encrypt data key. {}", e.getMessage());
+            throw new PulsarClientException.CryptoException(e.getMessage());
+        }
+    }
+
+    private static Cipher loadAndInitCipher(PublicKey pubKey) throws 
PulsarClientException.CryptoException,
+            NoSuchAlgorithmException, NoSuchProviderException, 
NoSuchPaddingException, InvalidKeyException,
+            InvalidAlgorithmParameterException {
+        Cipher dataKeyCipher = null;
+        AlgorithmParameterSpec params = null;
+        // Encrypt data key using public key
+        if (RSA.equals(pubKey.getAlgorithm())) {
+            dataKeyCipher = Cipher.getInstance(RSA_TRANS, 
BouncyCastleProvider.PROVIDER_NAME);
+        } else if (ECDSA.equals(pubKey.getAlgorithm())) {
+            dataKeyCipher = Cipher.getInstance(ECIES, 
BouncyCastleProvider.PROVIDER_NAME);
+            params = MessageCryptoBc.createIESParameterSpec();
+        } else {
+            String msg =  "Unsupported key type " + pubKey.getAlgorithm();
+            log.error(msg);
+            throw new PulsarClientException.CryptoException(msg);
+        }
+        if (params != null) {
+            dataKeyCipher.init(Cipher.ENCRYPT_MODE, pubKey, params);
+        } else {
+            dataKeyCipher.init(Cipher.ENCRYPT_MODE, pubKey);
+        }
+        return dataKeyCipher;
+    }
+
+    public static byte[] compressionIfNeeded(CompressionType compressionType, 
byte[] payload) {
+        if (compressionType != null && 
!CompressionType.NONE.equals(compressionType)) {
+            CompressionCodec codec = 
CompressionCodecProvider.getCompressionCodec(compressionType);
+            ByteBuf input = 
PulsarByteBufAllocator.DEFAULT.buffer(payload.length, payload.length);
+            input.writeBytes(payload);
+            ByteBuf output = codec.encode(input);
+            input.release();
+            byte[] res = new byte[output.readableBytes()];
+            output.readBytes(res);
+            output.release();
+            return res;
+        }
+        return payload;
+    }
+
+    public static EncryptedPayloadAndParam encryptPayload(CryptoKeyReader 
cryptoKeyReader, MessageCryptoBc msgCrypto,
+                                                           byte[] payload, 
String keyName)
+            throws PulsarClientException {
+        ByteBuffer unEncryptedMessagePayload = ByteBuffer.wrap(payload);
+        ByteBuffer encryptedMessagePayload = 
ByteBuffer.allocate(unEncryptedMessagePayload.remaining() + 512);
+        MessageMetadata ignoredMessageMetadata = new MessageMetadata();
+        msgCrypto.encrypt(Collections.singleton(keyName), cryptoKeyReader,
+                () -> ignoredMessageMetadata, unEncryptedMessagePayload, 
encryptedMessagePayload);
+        byte[] res = new byte[encryptedMessagePayload.remaining()];
+        encryptedMessagePayload.get(res);
+        return new 
EncryptedPayloadAndParam(WssClientSideEncryptUtils.base64Encode(res),
+                
WssClientSideEncryptUtils.base64Encode(ignoredMessageMetadata.getEncryptionParam()));
+    }
+
+    @AllArgsConstructor
+    public static class EncryptedPayloadAndParam {
+        public final String encryptedPayload;
+        public final String encryptionParam;
+    }
+
+    public static byte[] decryptMsgPayload(String payloadString, 
EncryptionContext encryptionContext,
+                                           CryptoKeyReader cryptoKeyReader, 
MessageCryptoBc msgCrypto) {
+        byte[] payload = base64Decode(payloadString);
+        if (encryptionContext == null) {
+            return payload;
+        }
+
+        MessageMetadata messageMetadata = new MessageMetadata();
+        Map<String, EncryptionContext.EncryptionKey> encKeys = 
encryptionContext.getKeys();
+        for (Map.Entry<String, EncryptionContext.EncryptionKey> entry : 
encKeys.entrySet()) {
+            EncryptionKeys encryptionKeys = messageMetadata.addEncryptionKey()
+                    
.setKey(entry.getKey()).setValue(entry.getValue().getKeyValue());
+            if (entry.getValue().getMetadata() != null) {
+                for (Map.Entry<String, String> prop : 
entry.getValue().getMetadata().entrySet()) {
+                    
encryptionKeys.addMetadata().setKey(prop.getKey()).setValue(prop.getValue());
+                }
+            }
+        }
+        messageMetadata.setEncryptionParam(encryptionContext.getParam());
+
+        // Create input and output.
+        ByteBuffer input = ByteBuffer.allocate(payload.length);
+        ByteBuffer output = 
ByteBuffer.allocate(msgCrypto.getMaxOutputSize(payload.length));
+        input.put(payload);
+        input.flip();
+
+        // Decrypt.
+        msgCrypto.decrypt(() -> messageMetadata, input, output, 
cryptoKeyReader);
+        byte[] res = new byte[output.limit()];
+        output.get(res);
+        return res;
+    }
+
+    public static byte[] unCompressionIfNeeded(byte[] payloadBytes, 
EncryptionContext encryptionContext) throws IOException {
+        if (encryptionContext.getCompressionType() != null && 
!org.apache.pulsar.client.api.CompressionType.NONE
+                .equals(encryptionContext.getCompressionType())) {
+            CompressionCodec codec =
+                    
CompressionCodecProvider.getCompressionCodec(encryptionContext.getCompressionType());
+            ByteBuf input = 
PulsarByteBufAllocator.DEFAULT.buffer(payloadBytes.length, payloadBytes.length);
+            input.writeBytes(payloadBytes);
+            ByteBuf output = codec.decode(input, 
encryptionContext.getUncompressedMessageSize());
+            input.release();
+            byte[] res = new byte[output.readableBytes()];
+            output.readBytes(res);
+            output.release();
+            return res;
+        }
+        return payloadBytes;
+    }
+
+    /**
+     * Note: this method does not parse the message in its entirety; it only 
parses the payload of the message.
+     */
+    public static List<ConsumerMessage> extractBatchMessagesIfNeeded(byte[] 
payloadBytes,
+                                                              
EncryptionContext encryptionContext) throws IOException {
+        ByteBuf payload = 
PulsarByteBufAllocator.DEFAULT.buffer(payloadBytes.length);
+        payload.writeBytes(payloadBytes);
+        if (encryptionContext.getBatchSize().isPresent()) {
+            List<ConsumerMessage> res = new ArrayList<>();
+            int batchSize = encryptionContext.getBatchSize().get();
+            for (int i = 0; i < batchSize; i++) {
+                ConsumerMessage msg = new ConsumerMessage();
+                SingleMessageMetadata singleMsgMetadata = new 
SingleMessageMetadata();
+                ByteBuf singleMsgPayload = 
Commands.deSerializeSingleMessageInBatch(payload, singleMsgMetadata, i,
+                        batchSize);
+                if (singleMsgMetadata.getPayloadSize() < 1) {
+                    msg.payload = null;
+                } else {
+                    byte[] bs = new byte[singleMsgPayload.readableBytes()];
+                    singleMsgPayload.readBytes(bs);
+                    msg.payload = new String(bs, UTF8);
+                }
+                res.add(msg);
+            }
+            return res;
+        }
+        ConsumerMessage msg = new ConsumerMessage();
+        msg.payload = new String(payloadBytes, UTF8);
+        return Collections.singletonList(msg);
+    }
+}
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/DummyCryptoKeyReaderImpl.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/DummyCryptoKeyReaderImpl.java
new file mode 100644
index 00000000000..df9392db2de
--- /dev/null
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/DummyCryptoKeyReaderImpl.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.util.Map;
+
+/**
+ * An empty implement. Doesn't provide any public key or private key, and just 
returns `null`.
+ */
+public class DummyCryptoKeyReaderImpl implements CryptoKeyReader {
+
+    public static final DummyCryptoKeyReaderImpl INSTANCE = new 
DummyCryptoKeyReaderImpl();
+
+    private DummyCryptoKeyReaderImpl(){}
+
+    @Override
+    public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> 
metadata) {
+        return null;
+    }
+
+    @Override
+    public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> 
metadata) {
+        return null;
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-client-messagecrypto-bc/src/main/java/org/apache/pulsar/client/impl/crypto/MessageCryptoBc.java
 
b/pulsar-client-messagecrypto-bc/src/main/java/org/apache/pulsar/client/impl/crypto/MessageCryptoBc.java
index 146f066ae2c..5778b0701a4 100644
--- 
a/pulsar-client-messagecrypto-bc/src/main/java/org/apache/pulsar/client/impl/crypto/MessageCryptoBc.java
+++ 
b/pulsar-client-messagecrypto-bc/src/main/java/org/apache/pulsar/client/impl/crypto/MessageCryptoBc.java
@@ -53,6 +53,7 @@ import javax.crypto.SecretKey;
 import javax.crypto.ShortBufferException;
 import javax.crypto.spec.GCMParameterSpec;
 import javax.crypto.spec.SecretKeySpec;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.CryptoKeyReader;
 import org.apache.pulsar.client.api.EncryptionKeyInfo;
@@ -83,14 +84,14 @@ import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter;
 @Slf4j
 public class MessageCryptoBc implements MessageCrypto<MessageMetadata, 
MessageMetadata> {
 
-    private static final String ECDSA = "ECDSA";
-    private static final String RSA = "RSA";
-    private static final String ECIES = "ECIES";
+    public static final String ECDSA = "ECDSA";
+    public static final String RSA = "RSA";
+    public static final String ECIES = "ECIES";
 
     // Ideally the transformation should also be part of the message property. 
This will prevent client
     // from assuming hardcoded value. However, it will increase the size of 
the message even further.
-    private static final String RSA_TRANS = 
"RSA/NONE/OAEPWithSHA1AndMGF1Padding";
-    private static final String AESGCM = "AES/GCM/NoPadding";
+    public static final String RSA_TRANS = 
"RSA/NONE/OAEPWithSHA1AndMGF1Padding";
+    public static final String AESGCM = "AES/GCM/NoPadding";
 
     private static KeyGenerator keyGenerator;
     private static final int tagLen = 16 * 8;
@@ -100,6 +101,7 @@ public class MessageCryptoBc implements 
MessageCrypto<MessageMetadata, MessageMe
     private String logCtx;
 
     // Data key which is used to encrypt message
+    @Getter
     private SecretKey dataKey;
     private LoadingCache<ByteBuffer, SecretKey> dataKeyCache;
 
@@ -177,7 +179,7 @@ public class MessageCryptoBc implements 
MessageCrypto<MessageMetadata, MessageMe
 
     }
 
-    private PublicKey loadPublicKey(byte[] keyBytes) throws Exception {
+    public static PublicKey loadPublicKey(byte[] keyBytes) throws Exception {
 
         Reader keyReader = new StringReader(new String(keyBytes));
         PublicKey publicKey = null;
@@ -354,7 +356,7 @@ public class MessageCryptoBc implements 
MessageCrypto<MessageMetadata, MessageMe
     }
 
     // required since Bouncycastle 1.72 when using ECIES, it is required to 
pass in an IESParameterSpec
-    private IESParameterSpec createIESParameterSpec() {
+    public static IESParameterSpec createIESParameterSpec() {
         // the IESParameterSpec to use was discovered by debugging 
BouncyCastle 1.69 and running the
         // test 
org.apache.pulsar.client.api.SimpleProducerConsumerTest#testCryptoWithChunking
         return new IESParameterSpec(null, null, 128);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 551950ebc58..ed2970d5e41 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1739,7 +1739,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         if (conf.getCryptoKeyReader() == null) {
             switch (conf.getCryptoFailureAction()) {
                 case CONSUME:
-                    log.warn("[{}][{}][{}] CryptoKeyReader interface is not 
implemented. Consuming encrypted message.",
+                    log.debug("[{}][{}][{}] CryptoKeyReader interface is not 
implemented. Consuming encrypted message.",
                             topic, subscription, consumerName);
                     return payload.retain();
                 case DISCARD:
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 267b06649d7..9bd1421d035 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -586,11 +586,14 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
 
             msgMetadata.setProducerName(producerName);
 
-            if (conf.getCompressionType() != CompressionType.NONE) {
-                msgMetadata
-                        
.setCompression(CompressionCodecProvider.convertToWireProtocol(conf.getCompressionType()));
+            // The field "uncompressedSize" is zero means the compression info 
were not set yet.
+            if (msgMetadata.getUncompressedSize() <= 0) {
+                if (conf.getCompressionType() != CompressionType.NONE) {
+                    msgMetadata
+                            
.setCompression(CompressionCodecProvider.convertToWireProtocol(conf.getCompressionType()));
+                }
+                msgMetadata.setUncompressedSize(uncompressedSize);
             }
-            msgMetadata.setUncompressedSize(uncompressedSize);
         }
     }
 
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
index 2ab62b10ee9..08a23eebdae 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
@@ -465,6 +465,8 @@ public class ConsumerHandler extends 
AbstractWebSocketHandler {
 
         if (service.getCryptoKeyReader().isPresent()) {
             builder.cryptoKeyReader(service.getCryptoKeyReader().get());
+        } else {
+            // If users want to decrypt messages themselves, they should set 
"cryptoFailureAction" to "CONSUME".
         }
         return builder;
     }
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
index 5ad1283fe84..6a3f77ed037 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
@@ -21,10 +21,12 @@ package org.apache.pulsar.websocket;
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.lang.String.format;
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
 import static 
org.apache.pulsar.websocket.WebSocketError.FailedToDeserializeFromJSON;
 import static org.apache.pulsar.websocket.WebSocketError.PayloadEncodingError;
 import static org.apache.pulsar.websocket.WebSocketError.UnknownError;
 import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectReader;
 import com.google.common.base.Enums;
 import java.io.IOException;
@@ -33,25 +35,31 @@ import java.util.Arrays;
 import java.util.Base64;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.LongAdder;
+import java.util.stream.Collectors;
 import javax.servlet.http.HttpServletRequest;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.DummyCryptoKeyReaderImpl;
 import org.apache.pulsar.client.api.HashingScheme;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.SchemaSerializationException;
-import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
+import org.apache.pulsar.common.api.proto.KeyValue;
 import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.websocket.data.ProducerAck;
 import org.apache.pulsar.websocket.data.ProducerMessage;
+import org.apache.pulsar.websocket.service.WSSDummyMessageCryptoImpl;
 import org.apache.pulsar.websocket.stats.StatsBuckets;
 import org.eclipse.jetty.websocket.api.WriteCallback;
 import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
@@ -76,6 +84,7 @@ public class ProducerHandler extends AbstractWebSocketHandler 
{
     private final LongAdder numBytesSent;
     private final StatsBuckets publishLatencyStatsUSec;
     private volatile long msgPublishedCounter = 0;
+    private boolean clientSideEncrypt;
     private static final AtomicLongFieldUpdater<ProducerHandler> 
MSG_PUBLISHED_COUNTER_UPDATER =
             AtomicLongFieldUpdater.newUpdater(ProducerHandler.class, 
"msgPublishedCounter");
 
@@ -98,6 +107,12 @@ public class ProducerHandler extends 
AbstractWebSocketHandler {
 
         try {
             this.producer = 
getProducerBuilder(service.getPulsarClient()).topic(topic.toString()).create();
+            if (clientSideEncrypt) {
+                log.info("[{}] [{}] The producer session is created with param 
encryptionKeyValues, which means that"
+                                + " message encryption will be done on the 
client side, then the server will skip "
+                                + "batch message processing, message 
compression processing, and message encryption"
+                                + " processing", producer.getTopic(), 
producer.getProducerName());
+            }
             if (!this.service.addProducer(this)) {
                 log.warn("[{}:{}] Failed to add producer handler for topic 
{}", request.getRemoteAddr(),
                         request.getRemotePort(), topic);
@@ -159,7 +174,7 @@ public class ProducerHandler extends 
AbstractWebSocketHandler {
         }
 
         final long msgSize = rawPayload.length;
-        TypedMessageBuilder<byte[]> builder = producer.newMessage();
+        TypedMessageBuilderImpl<byte[]> builder = 
(TypedMessageBuilderImpl<byte[]>) producer.newMessage();
 
         try {
             builder.value(rawPayload);
@@ -192,6 +207,37 @@ public class ProducerHandler extends 
AbstractWebSocketHandler {
             builder.deliverAfter(sendRequest.deliverAfterMs, 
TimeUnit.MILLISECONDS);
         }
 
+        // If client-side encryption is enabled, the attributes 
"encryptParam", "uncompressedMessageSize",
+        // "uncompressedMessageSize" and "batchSize" of message metadata must 
be set according to the parameters
+        // when the client sends messages.
+        if (clientSideEncrypt) {
+            try {
+                if (!StringUtils.isBlank(sendRequest.encryptionParam)) {
+                    
builder.getMetadataBuilder().setEncryptionParam(Base64.getDecoder()
+                            .decode(sendRequest.encryptionParam));
+                }
+            } catch (Exception e){
+                String msg = format("Invalid Base64 encryptionParam error=%s", 
e.getMessage());
+                sendAckResponse(new ProducerAck(PayloadEncodingError, msg, 
null, requestContext));
+                return;
+            }
+            if (sendRequest.compressionType != null && 
sendRequest.uncompressedMessageSize != null) {
+                // Set compression information.
+                
builder.getMetadataBuilder().setCompression(sendRequest.compressionType);
+                
builder.getMetadataBuilder().setUncompressedSize(sendRequest.uncompressedMessageSize);
+            } else if 
((org.apache.pulsar.common.api.proto.CompressionType.NONE.equals(sendRequest.compressionType)
+                    || sendRequest.compressionType == null)
+                    && sendRequest.uncompressedMessageSize == null) {
+                // Nothing to do, the method send async will set these two 
attributes.
+            } else {
+                // Only one param is set.
+                sendAckResponse(new ProducerAck(PayloadEncodingError, "the 
params compressionType and"
+                        + " uncompressedMessageSize should both empty or both 
non-empty",
+                        null, requestContext));
+                return;
+            }
+        }
+
         final long now = System.nanoTime();
 
         builder.sendAsync().thenAccept(msgId -> {
@@ -205,8 +251,8 @@ public class ProducerHandler extends 
AbstractWebSocketHandler {
                 sendAckResponse(new ProducerAck(messageId, 
sendRequest.context));
             }
         }).exceptionally(exception -> {
-            log.warn("[{}] Error occurred while producer handler was sending 
msg from {}: {}", producer.getTopic(),
-                    getRemote().getInetSocketAddress().toString(), 
exception.getMessage());
+            log.warn("[{}] Error occurred while producer handler was sending 
msg from {}", producer.getTopic(),
+                    getRemote().getInetSocketAddress().toString(), exception);
             numMsgsFailed.increment();
             sendAckResponse(
                     new ProducerAck(UnknownError, exception.getMessage(), 
null, sendRequest.context));
@@ -315,30 +361,127 @@ public class ProducerHandler extends 
AbstractWebSocketHandler {
             
builder.sendTimeout(Integer.parseInt(queryParams.get("sendTimeoutMillis")), 
TimeUnit.MILLISECONDS);
         }
 
-        if (queryParams.containsKey("batchingEnabled")) {
-            
builder.enableBatching(Boolean.parseBoolean(queryParams.get("batchingEnabled")));
+        if (queryParams.containsKey("messageRoutingMode")) {
+            checkArgument(
+                    Enums.getIfPresent(MessageRoutingMode.class, 
queryParams.get("messageRoutingMode")).isPresent(),
+                    "Invalid messageRoutingMode %s", 
queryParams.get("messageRoutingMode"));
+            MessageRoutingMode routingMode = 
MessageRoutingMode.valueOf(queryParams.get("messageRoutingMode"));
+            if (!MessageRoutingMode.CustomPartition.equals(routingMode)) {
+                builder.messageRoutingMode(routingMode);
+            }
         }
 
-        if (queryParams.containsKey("batchingMaxMessages")) {
-            
builder.batchingMaxMessages(Integer.parseInt(queryParams.get("batchingMaxMessages")));
+        Map<String, EncryptionKey> encryptionKeyMap = 
tryToExtractJsonEncryptionKeys();
+        if (encryptionKeyMap != null) {
+            popularProducerBuilderForClientSideEncrypt(builder, 
encryptionKeyMap);
+        } else {
+            popularProducerBuilderForServerSideEncrypt(builder);
         }
+        return builder;
+    }
 
-        if (queryParams.containsKey("maxPendingMessages")) {
-            
builder.maxPendingMessages(Integer.parseInt(queryParams.get("maxPendingMessages")));
+    private Map<String, EncryptionKey> tryToExtractJsonEncryptionKeys() {
+        if (!queryParams.containsKey("encryptionKeys")) {
+            return null;
+        }
+        // Base64 decode.
+        byte[] param = null;
+        try {
+            param = 
Base64.getDecoder().decode(StringUtils.trim(queryParams.get("encryptionKeys")));
+        } catch (Exception base64DecodeEx) {
+            return null;
+        }
+        try {
+            Map<String, EncryptionKey> keys = 
ObjectMapperFactory.getMapper().getObjectMapper()
+                    .readValue(param, new TypeReference<Map<String, 
EncryptionKey>>() {});
+            if (keys.isEmpty()) {
+                return null;
+            }
+            if (keys.values().iterator().next().getKeyValue() == null) {
+                return null;
+            }
+            return keys;
+        } catch (IOException ex) {
+            return null;
         }
+    }
 
-        if (queryParams.containsKey("batchingMaxPublishDelay")) {
-            
builder.batchingMaxPublishDelay(Integer.parseInt(queryParams.get("batchingMaxPublishDelay")),
-                    TimeUnit.MILLISECONDS);
+    private void 
popularProducerBuilderForClientSideEncrypt(ProducerBuilder<byte[]> builder,
+                                                            Map<String, 
EncryptionKey> encryptionKeyMap) {
+        this.clientSideEncrypt = true;
+        int keysLen = encryptionKeyMap.size();
+        final String[] keyNameArray = new String[keysLen];
+        final byte[][] keyValueArray = new byte[keysLen][];
+        final List<KeyValue>[] keyMetadataArray = new List[keysLen];
+        // Format keys.
+        int index = 0;
+        for (Map.Entry<String, EncryptionKey> entry : 
encryptionKeyMap.entrySet()) {
+            checkArgument(StringUtils.isNotBlank(entry.getKey()), "Empty param 
encryptionKeys.key");
+            checkArgument(entry.getValue() != null, "Empty param 
encryptionKeys.value");
+            checkArgument(entry.getValue().getKeyValue() != null, "Empty param 
encryptionKeys.value.keyValue");
+            keyNameArray[index] = StringUtils.trim(entry.getKey());
+            keyValueArray[index] = entry.getValue().getKeyValue();
+            if (entry.getValue().getMetadata() == null) {
+                keyMetadataArray[index] = Collections.emptyList();
+            } else {
+                keyMetadataArray[index] = 
entry.getValue().getMetadata().entrySet().stream()
+                        .map(e -> new 
KeyValue().setKey(e.getKey()).setValue(e.getValue()))
+                        .collect(Collectors.toList());
+            }
+            builder.addEncryptionKey(keyNameArray[index]);
         }
+        // Background: The order of message payload process during message 
sending:
+        //  1. The Producer will composite several message payloads into a 
batched message payload if the producer is
+        //    enabled batch;
+        //  2. The Producer will compress the batched message payload to a 
compressed payload if enabled compression;
+        //  3. After the previous two steps, the Producer encrypts the 
compressed payload to an encrypted payload.
+        //
+        // Since the order of producer operation for message payloads is 
"compression --> encryption", users need to
+        // handle Compression themselves if needed. We just disable 
server-side batch process, server-side compression,
+        // and server-side encryption, and only set the message metadata that.
+        builder.enableBatching(false);
+        // Disable server-side compression, and just set compression 
attributes into the message metadata when sending
+        // messages(see the method "onWebSocketText").
+        builder.compressionType(CompressionType.NONE);
+        // Disable server-side encryption, and just set encryption attributes 
into the message metadata when sending
+        // messages(see the method "onWebSocketText").
+        builder.cryptoKeyReader(DummyCryptoKeyReaderImpl.INSTANCE);
+        // Set the param `enableChunking` to `false`(the default value is 
`false`) to prevent unexpected problems if
+        // the default setting is changed in the future.
+        builder.enableChunking(false);
+        // Inject encryption metadata decorator.
+        builder.messageCrypto(new WSSDummyMessageCryptoImpl(msgMetadata -> {
+            for (int i = 0; i < keyNameArray.length; i++) {
+                
msgMetadata.addEncryptionKey().setKey(keyNameArray[i]).setValue(keyValueArray[i])
+                        .addAllMetadatas(keyMetadataArray[i]);
+            }
+        }));
+        // Do warning param check and print warning log.
+        printLogIfSettingDiscardedBatchedParams();
+        printLogIfSettingDiscardedCompressionParams();
+    }
 
-        if (queryParams.containsKey("messageRoutingMode")) {
-            checkArgument(
-                    Enums.getIfPresent(MessageRoutingMode.class, 
queryParams.get("messageRoutingMode")).isPresent(),
-                    "Invalid messageRoutingMode %s", 
queryParams.get("messageRoutingMode"));
-            MessageRoutingMode routingMode = 
MessageRoutingMode.valueOf(queryParams.get("messageRoutingMode"));
-            if (!MessageRoutingMode.CustomPartition.equals(routingMode)) {
-                builder.messageRoutingMode(routingMode);
+    private void 
popularProducerBuilderForServerSideEncrypt(ProducerBuilder<byte[]> builder) {
+        this.clientSideEncrypt = false;
+        if (queryParams.containsKey("batchingEnabled")) {
+            boolean batchingEnabled = 
Boolean.parseBoolean(queryParams.get("batchingEnabled"));
+            if (batchingEnabled) {
+                builder.enableBatching(true);
+                if (queryParams.containsKey("batchingMaxMessages")) {
+                    
builder.batchingMaxMessages(Integer.parseInt(queryParams.get("batchingMaxMessages")));
+                }
+
+                if (queryParams.containsKey("maxPendingMessages")) {
+                    
builder.maxPendingMessages(Integer.parseInt(queryParams.get("maxPendingMessages")));
+                }
+
+                if (queryParams.containsKey("batchingMaxPublishDelay")) {
+                    
builder.batchingMaxPublishDelay(Integer.parseInt(queryParams.get("batchingMaxPublishDelay")),
+                            TimeUnit.MILLISECONDS);
+                }
+            } else {
+                builder.enableBatching(false);
+                printLogIfSettingDiscardedBatchedParams();
             }
         }
 
@@ -356,7 +499,27 @@ public class ProducerHandler extends 
AbstractWebSocketHandler {
                 builder.addEncryptionKey(key);
             }
         }
-        return builder;
+    }
+
+    private void printLogIfSettingDiscardedBatchedParams() {
+        if (clientSideEncrypt && queryParams.containsKey("batchingEnabled")) {
+            log.info("Since clientSideEncrypt is true, the param 
batchingEnabled of producer will be ignored");
+        }
+        if (queryParams.containsKey("batchingMaxMessages")) {
+            log.info("Since batchingEnabled is false, the param 
batchingMaxMessages of producer will be ignored");
+        }
+        if (queryParams.containsKey("maxPendingMessages")) {
+            log.info("Since batchingEnabled is false, the param 
maxPendingMessages of producer will be ignored");
+        }
+        if (queryParams.containsKey("batchingMaxPublishDelay")) {
+            log.info("Since batchingEnabled is false, the param 
batchingMaxPublishDelay of producer will be ignored");
+        }
+    }
+
+    private void printLogIfSettingDiscardedCompressionParams() {
+        if (clientSideEncrypt && queryParams.containsKey("compressionType")) {
+            log.info("Since clientSideEncrypt is true, the param 
compressionType of producer will be ignored");
+        }
     }
 
     private static final Logger log = 
LoggerFactory.getLogger(ProducerHandler.class);
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ProducerMessage.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ProducerMessage.java
index 4831b905514..12cb3b20c19 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ProducerMessage.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ProducerMessage.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
+import org.apache.pulsar.common.api.proto.CompressionType;
 
 /**
  * Class represent single message to be published.
@@ -69,4 +70,13 @@ public class ProducerMessage {
 
     // Base64 encoded serialized schema for payload
     public String valueSchema;
+
+    // Base64 encoded serialized initialization vector used when the client 
encrypts.
+    public String encryptionParam;
+
+    // Compression type. Do not set it if compression is not performed.
+    public CompressionType compressionType;
+
+    // The size of the payload before compression. Do not set it if 
compression is not performed.
+    public Integer uncompressedMessageSize;
 }
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WSSDummyMessageCryptoImpl.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WSSDummyMessageCryptoImpl.java
new file mode 100644
index 00000000000..43f7a368bbb
--- /dev/null
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WSSDummyMessageCryptoImpl.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.websocket.service;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.MessageCrypto;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+
+/***
+ * This class is used in scenarios where the payload of the message has been 
encrypted and the producer does not need
+ * to encrypt it again.
+ * It discards payload encryption and only relies {@link 
#metadataModifierForSend} to set the encryption info into the
+ * message metadata.
+ */
+public class WSSDummyMessageCryptoImpl implements 
MessageCrypto<MessageMetadata, MessageMetadata> {
+
+    public static final WSSDummyMessageCryptoImpl INSTANCE_FOR_CONSUMER =
+            new WSSDummyMessageCryptoImpl(msgMetadata -> {});
+
+    private final Consumer<MessageMetadata> metadataModifierForSend;
+
+    public WSSDummyMessageCryptoImpl(Consumer<MessageMetadata> 
metadataModifierForSend) {
+        this.metadataModifierForSend = metadataModifierForSend;
+    }
+
+    @Override
+    public void addPublicKeyCipher(Set keyNames, CryptoKeyReader keyReader)
+            throws PulsarClientException.CryptoException {}
+
+    @Override
+    public boolean removeKeyCipher(String keyName) {
+        return true;
+    }
+
+    @Override
+    public int getMaxOutputSize(int inputLen) {
+        return inputLen;
+    }
+
+    @Override
+    public boolean decrypt(Supplier<MessageMetadata> messageMetadataSupplier, 
ByteBuffer payload, ByteBuffer outBuffer,
+                           CryptoKeyReader keyReader) {
+        outBuffer.put(payload);
+        outBuffer.flip();
+        return true;
+    }
+
+    @Override
+    public synchronized void encrypt(Set<String> encKeys, CryptoKeyReader 
keyReader,
+                                    Supplier<MessageMetadata> 
messageMetadataSupplier,
+                                    ByteBuffer payload, ByteBuffer outBuffer) 
throws PulsarClientException {
+        outBuffer.put(payload);
+        outBuffer.flip();
+        metadataModifierForSend.accept(messageMetadataSupplier.get());
+    }
+}
diff --git 
a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/ProducerHandlerTest.java
 
b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/ProducerHandlerTest.java
index 5f773a8e2e1..d09b5941f24 100644
--- 
a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/ProducerHandlerTest.java
+++ 
b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/ProducerHandlerTest.java
@@ -23,6 +23,7 @@ import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.websocket.data.ProducerMessage;
 import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
@@ -53,7 +54,7 @@ public class ProducerHandlerTest {
         PulsarClient pulsarClient = mock(PulsarClient.class);
         ProducerBuilder producerBuilder = mock(ProducerBuilder.class);
         Producer producer = mock(Producer.class);
-        TypedMessageBuilder messageBuilder = mock(TypedMessageBuilder.class);
+        TypedMessageBuilder messageBuilder = 
mock(TypedMessageBuilderImpl.class);
         ProducerMessage produceRequest = new ProducerMessage();
 
         produceRequest.setDeliverAfterMs(11111);

Reply via email to