This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 07eef59e16b [feat][ws] PIP-290 Make WSS support E2E encryption (#20958)
07eef59e16b is described below
commit 07eef59e16bca11788c2ec63c5f61d3bdcadb10d
Author: fengyubiao <[email protected]>
AuthorDate: Fri Aug 25 15:57:34 2023 +0800
[feat][ws] PIP-290 Make WSS support E2E encryption (#20958)
See PIP: https://github.com/apache/pulsar/pull/20923
---
.../proxy/ClientSideEncryptionWssConsumer.java | 154 +++++++++
.../proxy/ClientSideEncryptionWssProducer.java | 200 ++++++++++++
.../websocket/proxy/CryptoKeyReaderForTest.java | 77 +++++
...roxyPublishConsumeClientSideEncryptionTest.java | 360 +++++++++++++++++++++
.../websocket/proxy/WssClientSideEncryptUtils.java | 280 ++++++++++++++++
.../client/api/DummyCryptoKeyReaderImpl.java | 41 +++
.../pulsar/client/impl/crypto/MessageCryptoBc.java | 16 +-
.../apache/pulsar/client/impl/ConsumerImpl.java | 2 +-
.../apache/pulsar/client/impl/ProducerImpl.java | 11 +-
.../apache/pulsar/websocket/ConsumerHandler.java | 2 +
.../apache/pulsar/websocket/ProducerHandler.java | 205 ++++++++++--
.../pulsar/websocket/data/ProducerMessage.java | 10 +
.../service/WSSDummyMessageCryptoImpl.java | 77 +++++
.../pulsar/websocket/ProducerHandlerTest.java | 3 +-
14 files changed, 1404 insertions(+), 34 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ClientSideEncryptionWssConsumer.java
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ClientSideEncryptionWssConsumer.java
new file mode 100644
index 00000000000..ab8372cbfb0
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ClientSideEncryptionWssConsumer.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.websocket.proxy;
+
+import static org.testng.Assert.assertTrue;
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.websocket.data.ConsumerMessage;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.WebSocketAdapter;
+import org.eclipse.jetty.websocket.api.annotations.WebSocket;
+import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
+import org.eclipse.jetty.websocket.client.WebSocketClient;
+
+@Slf4j
+@WebSocket(maxTextMessageSize = 64 * 1024)
+public class ClientSideEncryptionWssConsumer extends WebSocketAdapter
implements Closeable {
+
+ private Session session;
+ private final CryptoKeyReader cryptoKeyReader;
+ private final String topicName;
+ private final String subscriptionName;
+ private final SubscriptionType subscriptionType;
+ private final String webSocketProxyHost;
+ private final int webSocketProxyPort;
+ private WebSocketClient wssClient;
+ private final MessageCryptoBc msgCrypto;
+ private final LinkedBlockingQueue<ConsumerMessage> incomingMessages = new
LinkedBlockingQueue<>();
+
+ public ClientSideEncryptionWssConsumer(String webSocketProxyHost, int
webSocketProxyPort, String topicName,
+ String subscriptionName,
SubscriptionType subscriptionType,
+ CryptoKeyReader cryptoKeyReader) {
+ this.webSocketProxyHost = webSocketProxyHost;
+ this.webSocketProxyPort = webSocketProxyPort;
+ this.topicName = topicName;
+ this.subscriptionName = subscriptionName;
+ this.subscriptionType = subscriptionType;
+ this.msgCrypto = new MessageCryptoBc("[" + topicName + "] [" +
subscriptionName + "]", false);
+ this.cryptoKeyReader = cryptoKeyReader;
+ }
+
+ public void start() throws Exception {
+ wssClient = new WebSocketClient();
+ wssClient.start();
+ session = wssClient.connect(this, buildConnectURL(), new
ClientUpgradeRequest()).get();
+ assertTrue(session.isOpen());
+ }
+
+ private URI buildConnectURL() throws PulsarClientException.CryptoException
{
+ final String protocolAndHostPort = "ws://" + webSocketProxyHost + ":"
+ webSocketProxyPort;
+
+ // Build the URL for producer.
+ final StringBuilder consumerUri = new
StringBuilder(protocolAndHostPort)
+ .append("/ws/v2/consumer/persistent/")
+ .append(topicName)
+ .append("/")
+ .append(subscriptionName)
+ .append("?")
+
.append("subscriptionType=").append(subscriptionType.toString())
+ .append("&").append("cryptoFailureAction=CONSUME");
+ return URI.create(consumerUri.toString());
+ }
+
+ public synchronized ConsumerMessage receive(int timeout, TimeUnit unit)
throws Exception {
+ ConsumerMessage msg = incomingMessages.poll(timeout, unit);
+ return msg;
+ }
+
+ @Override
+ public void onWebSocketClose(int statusCode, String reason) {
+ log.info("Connection closed: {} - {}", statusCode, reason);
+ this.session = null;
+ }
+
+ @Override
+ public void onWebSocketConnect(Session session) {
+ log.info("Got connect: {}", session);
+ this.session = session;
+ }
+
+ @Override
+ public void onWebSocketError(Throwable cause) {
+ log.error("Received an error", cause);
+ }
+
+ @Override
+ public void onWebSocketText(String text) {
+
+ try {
+ ConsumerMessage msg =
+ ObjectMapperFactory.getMapper().reader().readValue(text,
ConsumerMessage.class);
+ if (msg.messageId == null) {
+ log.error("Consumer[{}-{}] Could not extract the response
payload: {}", topicName, subscriptionName,
+ text);
+ return;
+ }
+ // Decrypt.
+ byte[] decryptedPayload =
WssClientSideEncryptUtils.decryptMsgPayload(msg.payload, msg.encryptionContext,
+ cryptoKeyReader, msgCrypto);
+ // Un-compression if needed.
+ byte[] unCompressedPayload =
WssClientSideEncryptUtils.unCompressionIfNeeded(decryptedPayload,
+ msg.encryptionContext);
+ // Extract batch messages if needed.
+ if (msg.encryptionContext.getBatchSize().isPresent()) {
+ List<ConsumerMessage> singleMsgs =
WssClientSideEncryptUtils.extractBatchMessagesIfNeeded(
+ unCompressedPayload, msg.encryptionContext);
+ for (ConsumerMessage singleMsg : singleMsgs) {
+ incomingMessages.add(singleMsg);
+ }
+ } else {
+ msg.payload = new String(unCompressedPayload,
StandardCharsets.UTF_8);
+ incomingMessages.add(msg);
+ }
+ } catch (Exception ex) {
+ log.error("Consumer[{}-{}] Could not extract the response payload:
{}", topicName, subscriptionName, text);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ wssClient.stop();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ClientSideEncryptionWssProducer.java
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ClientSideEncryptionWssProducer.java
new file mode 100644
index 00000000000..f9ae6cd4344
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ClientSideEncryptionWssProducer.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.websocket.proxy;
+
+import static org.testng.Assert.assertTrue;
+import static org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
+import static
org.apache.pulsar.websocket.proxy.WssClientSideEncryptUtils.EncryptedPayloadAndParam;
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
+import org.apache.pulsar.common.api.proto.CompressionType;
+import org.apache.pulsar.common.api.proto.MessageIdData;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.websocket.data.ProducerMessage;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.WebSocketAdapter;
+import org.eclipse.jetty.websocket.api.annotations.WebSocket;
+import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
+import org.eclipse.jetty.websocket.client.WebSocketClient;
+
+@Slf4j
+@WebSocket(maxTextMessageSize = 64 * 1024)
+public class ClientSideEncryptionWssProducer extends WebSocketAdapter
implements Closeable {
+
+ private Session session;
+ private volatile CompletableFuture<MessageIdData> sendFuture;
+ private final ScheduledExecutorService executor;
+ private final CryptoKeyReader cryptoKeyReader;
+ private final String topicName;
+ private final String producerName;
+ private final String webSocketProxyHost;
+ private final int webSocketProxyPort;
+ private final String keyName;
+ private WebSocketClient wssClient;
+ private final MessageCryptoBc msgCrypto;
+
+ public ClientSideEncryptionWssProducer(String webSocketProxyHost, int
webSocketProxyPort, String topicName,
+ String producerName,
CryptoKeyReader cryptoKeyReader, String keyName,
+ ScheduledExecutorService executor) {
+ this.webSocketProxyHost = webSocketProxyHost;
+ this.webSocketProxyPort = webSocketProxyPort;
+ this.topicName = topicName;
+ this.producerName = producerName;
+ this.msgCrypto = new MessageCryptoBc("[" + topicName + "] [" +
producerName + "]", true);
+ this.cryptoKeyReader = cryptoKeyReader;
+ this.keyName = keyName;
+ this.executor = executor;
+ }
+
+ public void start() throws Exception {
+ wssClient = new WebSocketClient();
+ wssClient.start();
+ session = wssClient.connect(this, buildConnectURL(), new
ClientUpgradeRequest()).get();
+ assertTrue(session.isOpen());
+ }
+
+ private URI buildConnectURL() throws PulsarClientException.CryptoException
{
+ final String protocolAndHostPort = "ws://" + webSocketProxyHost + ":"
+ webSocketProxyPort;
+
+ // Encode encrypted public key data.
+ final byte[] keyValue =
WssClientSideEncryptUtils.calculateEncryptedKeyValue(msgCrypto, cryptoKeyReader,
+ keyName);
+ EncryptionKey encryptionKey = new EncryptionKey();
+ encryptionKey.setKeyValue(keyValue);
+ encryptionKey.setMetadata(cryptoKeyReader.getPublicKey(keyName,
Collections.emptyMap()).getMetadata());
+ Map<String, EncryptionKey> encryptionKeyMap = new HashMap<>();
+ encryptionKeyMap.put(keyName, encryptionKey);
+
+ final String encryptionKeys =
+
WssClientSideEncryptUtils.toJSONAndBase64AndUrlEncode(encryptionKeyMap);
+
+ // Build the URL for producer.
+ final StringBuilder producerUrL = new
StringBuilder(protocolAndHostPort)
+ .append("/ws/v2/producer/persistent/")
+ .append(topicName)
+ .append("?")
+ .append("encryptionKeys=").append(encryptionKeys);
+ return URI.create(producerUrL.toString());
+ }
+
+ public synchronized MessageIdData sendMessage(ProducerMessage msg) throws
Exception {
+ if (sendFuture != null && !sendFuture.isDone() &&
!sendFuture.isCancelled()) {
+ throw new IllegalArgumentException("There is a message still in
sending.");
+ }
+ if (msg.payload == null) {
+ throw new IllegalArgumentException("Null value message is not
supported.");
+ }
+ // Compression.
+ byte[] unCompressedPayload =
msg.payload.getBytes(StandardCharsets.UTF_8);
+ byte[] compressedPayload =
WssClientSideEncryptUtils.compressionIfNeeded(msg.compressionType,
+ unCompressedPayload);
+ if (msg.compressionType != null &&
!CompressionType.NONE.equals(msg.compressionType)) {
+ msg.uncompressedMessageSize = unCompressedPayload.length;
+ }
+ // Encrypt.
+ EncryptedPayloadAndParam encryptedPayloadAndParam =
WssClientSideEncryptUtils.encryptPayload(
+ cryptoKeyReader, msgCrypto, compressedPayload, keyName);
+ msg.payload = encryptedPayloadAndParam.encryptedPayload;
+ msg.encryptionParam = encryptedPayloadAndParam.encryptionParam;
+ // Do send.
+ sendFuture = new CompletableFuture<>();
+ String jsonMsg =
ObjectMapperFactory.getMapper().writer().writeValueAsString(msg);
+ this.session.getRemote().sendString(jsonMsg);
+ // Wait for response.
+ executor.schedule(() -> {
+ synchronized (ClientSideEncryptionWssProducer.this) {
+ if (!sendFuture.isDone() && !sendFuture.isCancelled()) {
+ sendFuture.completeExceptionally(new
TimeoutException("Send timeout"));
+ }
+ }
+ }, 50, TimeUnit.SECONDS);
+ return sendFuture.get();
+ }
+
+ @Override
+ public void onWebSocketClose(int statusCode, String reason) {
+ log.info("Connection closed: {} - {}", statusCode, reason);
+ this.session = null;
+ if (!sendFuture.isDone() && !sendFuture.isCancelled()) {
+ sendFuture.completeExceptionally(new RuntimeException("Connection
was closed"));
+ }
+ }
+
+ @Override
+ public void onWebSocketConnect(Session session) {
+ log.info("Got connect: {}", session);
+ this.session = session;
+ }
+
+ @Override
+ public void onWebSocketError(Throwable cause) {
+ log.error("Received an error", cause);
+ }
+
+ @Override
+ public void onWebSocketText(String text) {
+ try {
+ ResponseOfSend responseOfSend =
+ ObjectMapperFactory.getMapper().reader().readValue(text,
ResponseOfSend.class);
+ if (responseOfSend.getErrorCode() != 0 ||
responseOfSend.getErrorMsg() != null) {
+ sendFuture.completeExceptionally(new RuntimeException(text));
+ } else {
+ byte[] bytes =
Base64.getDecoder().decode(responseOfSend.getMessageId());
+ MessageIdData messageIdData = new MessageIdData();
+ messageIdData.parseFrom(bytes);
+ sendFuture.complete(messageIdData);
+ }
+ } catch (Exception ex) {
+ log.error("Could not extract the response payload: {}", text);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ wssClient.stop();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Data
+ public static class ResponseOfSend {
+ private String result;
+ private String messageId;
+ private String errorMsg;
+ private int errorCode = -1;
+ private int schemaVersion;
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/CryptoKeyReaderForTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/CryptoKeyReaderForTest.java
new file mode 100644
index 00000000000..ff9cde766df
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/CryptoKeyReaderForTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.websocket.proxy;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.EncryptionKeyInfo;
+import org.testng.Assert;
+
+public class CryptoKeyReaderForTest implements CryptoKeyReader {
+
+ public static final Map<String, String> RANDOM_METADATA = new HashMap<>();
+
+ static {
+ RANDOM_METADATA.put(UUID.randomUUID().toString(),
UUID.randomUUID().toString());
+ RANDOM_METADATA.put(UUID.randomUUID().toString(),
UUID.randomUUID().toString());
+ RANDOM_METADATA.put(UUID.randomUUID().toString(),
UUID.randomUUID().toString());
+ }
+
+ @Override
+ public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String>
metadata) {
+ EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+ String CERT_FILE_PATH = "./src/test/resources/certificate/public-key."
+ keyName;
+ if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+ try {
+ keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+ // The metadata is meaningless, just to test that it can be
transferred to the consumer.
+ keyInfo.setMetadata(RANDOM_METADATA);
+ return keyInfo;
+ } catch (IOException e) {
+ Assert.fail("Failed to read certificate from " +
CERT_FILE_PATH);
+ }
+ } else {
+ Assert.fail("Certificate file " + CERT_FILE_PATH + " is not
present or not readable.");
+ }
+ return null;
+ }
+
+ @Override
+ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String>
metadata) {
+ EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+ String CERT_FILE_PATH =
"./src/test/resources/certificate/private-key." + keyName;
+ if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+ try {
+ keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+ keyInfo.setMetadata(RANDOM_METADATA);
+ return keyInfo;
+ } catch (IOException e) {
+ Assert.fail("Failed to read certificate from " +
CERT_FILE_PATH);
+ }
+ } else {
+ Assert.fail("Certificate file " + CERT_FILE_PATH + " is not
present or not readable.");
+ }
+ return null;
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeClientSideEncryptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeClientSideEncryptionTest.java
new file mode 100644
index 00000000000..ad69df4757f
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeClientSideEncryptionTest.java
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.websocket.proxy;
+
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.api.proto.CompressionType;
+import org.apache.pulsar.common.api.proto.MessageIdData;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.apache.pulsar.websocket.WebSocketService;
+import org.apache.pulsar.websocket.data.ConsumerMessage;
+import org.apache.pulsar.websocket.data.ProducerMessage;
+import org.apache.pulsar.websocket.service.ProxyServer;
+import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration;
+import org.apache.pulsar.websocket.service.WebSocketServiceStarter;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "websocket")
+public class ProxyPublishConsumeClientSideEncryptionTest extends
ProducerConsumerBase {
+ private static final int TIME_TO_CHECK_BACKLOG_QUOTA = 5;
+ private static final ScheduledExecutorService executor =
Executors.newScheduledThreadPool(1);
+ private static final Charset charset = Charset.defaultCharset();
+
+ private ProxyServer proxyServer;
+ private WebSocketService service;
+
+ @BeforeClass
+ public void setup() throws Exception {
+
conf.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
+
+ super.internalSetup();
+ super.producerBaseSetup();
+
+ WebSocketProxyConfiguration config = new WebSocketProxyConfiguration();
+ config.setWebServicePort(Optional.of(0));
+ config.setClusterName("test");
+ config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
+ WebSocketService service = spy(new WebSocketService(config));
+ doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service)
+ .createConfigMetadataStore(anyString(), anyInt(),
anyBoolean());
+ proxyServer = new ProxyServer(config);
+ WebSocketServiceStarter.start(proxyServer, service);
+ log.info("Proxy Server Started");
+ }
+
+ @AfterClass(alwaysRun = true)
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ if (service != null) {
+ service.close();
+ }
+ if (proxyServer != null) {
+ proxyServer.stop();
+ }
+ log.info("Finished Cleaning Up Test setup");
+ }
+
+ @DataProvider(name = "encryptKeyNames")
+ public Object[][] encryptKeyNames() {
+ return new Object[][]{
+ {"client-ecdsa.pem"},
+ {"client-rsa.pem"}
+ };
+ }
+
+ @Test(dataProvider = "encryptKeyNames")
+ public void testWssSendAndJavaConsumeWithEncryption(String keyName) throws
Exception {
+ final String topicName =
BrokerTestUtil.newUniqueName("public/default/tp_");
+ final String subscriptionName = "s1";
+ final String producerName = "wss-p1";
+ admin.topics().createNonPartitionedTopic(topicName);
+ admin.topics().createSubscription(topicName, subscriptionName,
MessageId.earliest);
+
+ // Create wss producer.
+ final String webSocketProxyHost = "localhost";
+ final int webSocketProxyPort = proxyServer.getListenPortHTTP().get();
+ final CryptoKeyReader cryptoKeyReader = new CryptoKeyReaderForTest();
+ ClientSideEncryptionWssProducer producer = new
ClientSideEncryptionWssProducer(webSocketProxyHost,
+ webSocketProxyPort, topicName, producerName, cryptoKeyReader,
keyName, executor);
+ producer.start();
+
+ // Send message.
+ String msgPayloadBeforeEncrypt = "msg-123";
+ ProducerMessage messageSent = new ProducerMessage();
+ messageSent.key = "k";
+ messageSent.payload = msgPayloadBeforeEncrypt;
+ MessageIdData messageIdData = producer.sendMessage(messageSent);
+ log.info("send success: {}", messageIdData.toString());
+
+ // Consume.
+ Consumer consumer =
pulsarClient.newConsumer().cryptoKeyReader(cryptoKeyReader)
+ .topic(topicName).subscriptionName(subscriptionName)
+ .subscribe();
+ Message msgReceived = consumer.receive(2, TimeUnit.SECONDS);
+ assertEquals(new String(msgReceived.getData(), charset),
msgPayloadBeforeEncrypt);
+
+ // cleanup.
+ producer.close();
+ consumer.close();
+ admin.topics().delete(topicName);
+ }
+
+ @DataProvider(name = "compressionTypes")
+ public Object[][] compressionTypes() {
+ return new Object[][]{
+ {CompressionType.NONE},
+ {CompressionType.LZ4},
+ {CompressionType.ZLIB},
+ {CompressionType.SNAPPY},
+ {CompressionType.ZSTD}
+ };
+ }
+
+ @Test(dataProvider = "compressionTypes")
+ public void
testWssSendAndJavaConsumeWithEncryptionAndCompression(CompressionType
compressionType)
+ throws Exception {
+ final String topicName =
BrokerTestUtil.newUniqueName("public/default/tp_");
+ final String subscriptionName = "s1";
+ final String producerName = "wss-p1";
+ final String keyName = "client-ecdsa.pem";
+ admin.topics().createNonPartitionedTopic(topicName);
+ admin.topics().createSubscription(topicName, subscriptionName,
MessageId.earliest);
+
+ // Create wss producer.
+ final String webSocketProxyHost = "localhost";
+ final int webSocketProxyPort = proxyServer.getListenPortHTTP().get();
+ final CryptoKeyReader cryptoKeyReader = new CryptoKeyReaderForTest();
+ ClientSideEncryptionWssProducer producer = new
ClientSideEncryptionWssProducer(webSocketProxyHost,
+ webSocketProxyPort, topicName, producerName, cryptoKeyReader,
keyName, executor);
+ producer.start();
+
+ // Send message.
+ String originalPayload = "msg-123";
+ ProducerMessage messageSent = new ProducerMessage();
+ messageSent.key = "k";
+ messageSent.payload = originalPayload;
+ messageSent.compressionType = compressionType;
+ MessageIdData messageIdData = producer.sendMessage(messageSent);
+ log.info("send success: {}", messageIdData.toString());
+
+ // Consume.
+ Consumer consumer =
pulsarClient.newConsumer().cryptoKeyReader(cryptoKeyReader)
+ .topic(topicName).subscriptionName(subscriptionName)
+ .subscribe();
+ Message msgReceived = consumer.receive(2, TimeUnit.SECONDS);
+ assertEquals(new String(msgReceived.getData(), charset),
originalPayload);
+
+ // cleanup.
+ producer.close();
+ consumer.close();
+ admin.topics().delete(topicName);
+ }
+
+ @Test(dataProvider = "encryptKeyNames")
+ public void testJavaSendAndWssConsumeWithEncryption(String keyName) throws
Exception {
+ final String topicName =
BrokerTestUtil.newUniqueName("public/default/tp_");
+ final String subscriptionName = "s1";
+ admin.topics().createNonPartitionedTopic(topicName);
+ admin.topics().createSubscription(topicName, subscriptionName,
MessageId.earliest);
+ final CryptoKeyReader cryptoKeyReader = new CryptoKeyReaderForTest();
+
+ final String originalPayload = "msg-123";
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topicName).addEncryptionKey(keyName)
+ .cryptoKeyReader(cryptoKeyReader).create();
+ producer.send(originalPayload.getBytes(StandardCharsets.UTF_8));
+
+ // Create wss consumer.
+ final String webSocketProxyHost = "localhost";
+ final int webSocketProxyPort = proxyServer.getListenPortHTTP().get();
+
+ ClientSideEncryptionWssConsumer consumer = new
ClientSideEncryptionWssConsumer(webSocketProxyHost,
+ webSocketProxyPort, topicName, subscriptionName,
SubscriptionType.Shared, cryptoKeyReader);
+ consumer.start();
+
+ // Receive message.
+ ConsumerMessage message = consumer.receive(2, TimeUnit.SECONDS);
+ assertEquals(message.payload, originalPayload);
+
+ // cleanup.
+ producer.close();
+ consumer.close();
+ admin.topics().delete(topicName);
+ }
+
+ @DataProvider(name = "compressionTypesForJ")
+ public Object[][] compressionTypesForJ() {
+ return new Object[][]{
+ {org.apache.pulsar.client.api.CompressionType.NONE},
+ {org.apache.pulsar.client.api.CompressionType.LZ4},
+ {org.apache.pulsar.client.api.CompressionType.ZLIB},
+ {org.apache.pulsar.client.api.CompressionType.SNAPPY},
+ {org.apache.pulsar.client.api.CompressionType.ZSTD}
+ };
+ }
+
+ @Test(dataProvider = "compressionTypesForJ")
+ public void
testJavaSendAndWssConsumeWithEncryptionAndCompression(org.apache.pulsar.client.api.CompressionType
+
compressionType)
+ throws
Exception {
+ final String topicName =
BrokerTestUtil.newUniqueName("public/default/tp_");
+ final String subscriptionName = "s1";
+ admin.topics().createNonPartitionedTopic(topicName);
+ admin.topics().createSubscription(topicName, subscriptionName,
MessageId.earliest);
+ final CryptoKeyReader cryptoKeyReader = new CryptoKeyReaderForTest();
+ final String keyName = "client-ecdsa.pem";
+
+ final String originalPayload = "msg-123";
+ Producer<byte[]> producer = pulsarClient.newProducer()
+
.topic(topicName).addEncryptionKey(keyName).compressionType(compressionType)
+ .cryptoKeyReader(cryptoKeyReader).create();
+ producer.send(originalPayload.getBytes(StandardCharsets.UTF_8));
+
+ // Create wss consumer.
+ final String webSocketProxyHost = "localhost";
+ final int webSocketProxyPort = proxyServer.getListenPortHTTP().get();
+
+ ClientSideEncryptionWssConsumer consumer = new
ClientSideEncryptionWssConsumer(webSocketProxyHost,
+ webSocketProxyPort, topicName, subscriptionName,
SubscriptionType.Shared, cryptoKeyReader);
+ consumer.start();
+
+ // Receive message.
+ ConsumerMessage message = consumer.receive(2, TimeUnit.SECONDS);
+ assertEquals(message.payload, originalPayload);
+
+ // cleanup.
+ producer.close();
+ consumer.close();
+ admin.topics().delete(topicName);
+ }
+
+ @Test
+ public void
testJavaSendAndWssConsumeWithEncryptionAndCompressionAndBatch() throws
Exception {
+ final String topicName =
BrokerTestUtil.newUniqueName("public/default/tp_");
+ final String subscriptionName = "s1";
+ admin.topics().createNonPartitionedTopic(topicName);
+ admin.topics().createSubscription(topicName, subscriptionName,
MessageId.earliest);
+ final CryptoKeyReader cryptoKeyReader = new CryptoKeyReaderForTest();
+ final String keyName = "client-ecdsa.pem";
+
+ final HashSet<String> messagesSent = new HashSet<>();
+ Producer<byte[]> producer =
pulsarClient.newProducer().enableBatching(true)
+ .batchingMaxMessages(1000)
+ .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+ .topic(topicName).addEncryptionKey(keyName)
+
.compressionType(org.apache.pulsar.client.api.CompressionType.LZ4)
+ .cryptoKeyReader(cryptoKeyReader).create();
+ for (int i = 0; i < 10; i++) {
+ String payload = "msg-" + i;
+ messagesSent.add(payload);
+ producer.sendAsync(payload.getBytes(StandardCharsets.UTF_8));
+ }
+ producer.flush();
+
+ // Create wss consumer.
+ final String webSocketProxyHost = "localhost";
+ final int webSocketProxyPort = proxyServer.getListenPortHTTP().get();
+
+ ClientSideEncryptionWssConsumer consumer = new
ClientSideEncryptionWssConsumer(webSocketProxyHost,
+ webSocketProxyPort, topicName, subscriptionName,
SubscriptionType.Shared, cryptoKeyReader);
+ consumer.start();
+
+ // Receive message.
+ final HashSet<String> messagesReceived = new HashSet<>();
+ while (true) {
+ ConsumerMessage message = consumer.receive(2, TimeUnit.SECONDS);
+ if (message == null) {
+ break;
+ }
+ messagesReceived.add(message.payload);
+ }
+ assertEquals(messagesReceived, messagesSent);
+
+ // cleanup.
+ producer.close();
+ consumer.close();
+ admin.topics().delete(topicName);
+ }
+
+
+
+ @Test(dataProvider = "encryptKeyNames")
+ public void testWssSendAndWssConsumeWithEncryption(String keyName) throws
Exception {
+ final String topicName =
BrokerTestUtil.newUniqueName("public/default/tp_");
+ final String subscriptionName = "s1";
+ final String producerName = "wss-p1";
+ admin.topics().createNonPartitionedTopic(topicName);
+ admin.topics().createSubscription(topicName, subscriptionName,
MessageId.earliest);
+
+ // Create wss producer.
+ final String webSocketProxyHost = "localhost";
+ final int webSocketProxyPort = proxyServer.getListenPortHTTP().get();
+ final CryptoKeyReader cryptoKeyReader = new CryptoKeyReaderForTest();
+ ClientSideEncryptionWssProducer producer = new
ClientSideEncryptionWssProducer(webSocketProxyHost,
+ webSocketProxyPort, topicName, producerName, cryptoKeyReader,
keyName, executor);
+ producer.start();
+
+ // Send message.
+ String msgPayloadBeforeEncrypt = "msg-123";
+ ProducerMessage messageSent = new ProducerMessage();
+ messageSent.key = "k";
+ messageSent.payload = msgPayloadBeforeEncrypt;
+ MessageIdData messageIdData = producer.sendMessage(messageSent);
+ log.info("send success: {}", messageIdData.toString());
+
+ // Consume.
+ ClientSideEncryptionWssConsumer consumer = new
ClientSideEncryptionWssConsumer(webSocketProxyHost,
+ webSocketProxyPort, topicName, subscriptionName,
SubscriptionType.Shared, cryptoKeyReader);
+ consumer.start();
+ ConsumerMessage msgReceived = consumer.receive(2, TimeUnit.SECONDS);
+ assertEquals(msgReceived.payload, msgPayloadBeforeEncrypt);
+
assertEquals(msgReceived.encryptionContext.getKeys().get(keyName).getMetadata(),
+ CryptoKeyReaderForTest.RANDOM_METADATA);
+
+ // cleanup.
+ producer.close();
+ consumer.close();
+ admin.topics().delete(topicName);
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/WssClientSideEncryptUtils.java
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/WssClientSideEncryptUtils.java
new file mode 100644
index 00000000000..dbbfb756625
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/WssClientSideEncryptUtils.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.websocket.proxy;
+
+import static org.apache.pulsar.client.impl.crypto.MessageCryptoBc.ECDSA;
+import static org.apache.pulsar.client.impl.crypto.MessageCryptoBc.ECIES;
+import static org.apache.pulsar.client.impl.crypto.MessageCryptoBc.RSA;
+import static org.apache.pulsar.client.impl.crypto.MessageCryptoBc.RSA_TRANS;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import io.netty.buffer.ByteBuf;
+import java.io.IOException;
+import java.net.URLEncoder;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.security.InvalidAlgorithmParameterException;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+import java.security.NoSuchProviderException;
+import java.security.PublicKey;
+import java.security.spec.AlgorithmParameterSpec;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import javax.crypto.Cipher;
+import javax.crypto.NoSuchPaddingException;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.EncryptionKeyInfo;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.common.api.proto.CompressionType;
+import org.apache.pulsar.common.api.proto.EncryptionKeys;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
+import org.apache.pulsar.common.compression.CompressionCodec;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.websocket.data.ConsumerMessage;
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+
+@Slf4j
+public class WssClientSideEncryptUtils {
+
+ public static Charset UTF8 = StandardCharsets.UTF_8;
+
+ public static String base64AndUrlEncode(String str) {
+ return base64AndUrlEncode(str.getBytes(UTF8), UTF8);
+ }
+
+ public static String base64AndUrlEncode(String str, Charset charset) {
+ return base64AndUrlEncode(str.getBytes(charset), charset);
+ }
+
+ public static String base64Encode(String str, Charset charset) {
+ return Base64.getEncoder().encodeToString(str.getBytes(charset));
+ }
+
+ public static String base64Encode(String str) {
+ return Base64.getEncoder().encodeToString(str.getBytes(UTF8));
+ }
+
+ public static byte[] base64Decode(String str) {
+ return Base64.getDecoder().decode(str);
+ }
+
+ public static String base64Encode(byte[] byteArray) {
+ return Base64.getEncoder().encodeToString(byteArray);
+ }
+
+ public static String base64AndUrlEncode(byte[] byteArray) {
+ String base64Encode = Base64.getEncoder().encodeToString(byteArray);
+ return URLEncoder.encode(base64Encode, UTF8);
+ }
+
+ public static String base64AndUrlEncode(byte[] byteArray, Charset charset)
{
+ String base64Encode = Base64.getEncoder().encodeToString(byteArray);
+ return URLEncoder.encode(base64Encode, charset);
+ }
+
+ public static String urlEncode(String str) {
+ return URLEncoder.encode(str, UTF8);
+ }
+
+ public static String urlEncode(String str, Charset charset) {
+ return URLEncoder.encode(str, charset);
+ }
+
+ public static byte[] calculateEncryptedKeyValue(MessageCryptoBc msgCrypto,
CryptoKeyReader cryptoKeyReader,
+ String publicKeyName)
+ throws PulsarClientException.CryptoException {
+ EncryptionKeyInfo encryptionKeyInfo =
cryptoKeyReader.getPublicKey(publicKeyName, Collections.emptyMap());
+ return calculateEncryptedKeyValue(msgCrypto,
encryptionKeyInfo.getKey());
+ }
+
+ public static String toJSONAndBase64AndUrlEncode(Object obj)
+ throws PulsarClientException.CryptoException {
+ try {
+ String json = ObjectMapperFactory.getMapper().getObjectMapper()
+ .writeValueAsString(obj);
+ return urlEncode(base64Encode(json));
+ } catch (JsonProcessingException e) {
+ throw new
PulsarClientException.CryptoException(String.format("Serialize object %s
failed", obj));
+ }
+ }
+
+ public static byte[] calculateEncryptedKeyValue(MessageCryptoBc msgCrypto,
byte[] publicKeyData)
+ throws PulsarClientException.CryptoException {
+ try {
+ PublicKey pubKey = MessageCryptoBc.loadPublicKey(publicKeyData);
+ Cipher dataKeyCipher = loadAndInitCipher(pubKey);
+ return dataKeyCipher.doFinal(msgCrypto.getDataKey().getEncoded());
+ } catch (Exception e) {
+ log.error("Failed to encrypt data key. {}", e.getMessage());
+ throw new PulsarClientException.CryptoException(e.getMessage());
+ }
+ }
+
+ private static Cipher loadAndInitCipher(PublicKey pubKey) throws
PulsarClientException.CryptoException,
+ NoSuchAlgorithmException, NoSuchProviderException,
NoSuchPaddingException, InvalidKeyException,
+ InvalidAlgorithmParameterException {
+ Cipher dataKeyCipher = null;
+ AlgorithmParameterSpec params = null;
+ // Encrypt data key using public key
+ if (RSA.equals(pubKey.getAlgorithm())) {
+ dataKeyCipher = Cipher.getInstance(RSA_TRANS,
BouncyCastleProvider.PROVIDER_NAME);
+ } else if (ECDSA.equals(pubKey.getAlgorithm())) {
+ dataKeyCipher = Cipher.getInstance(ECIES,
BouncyCastleProvider.PROVIDER_NAME);
+ params = MessageCryptoBc.createIESParameterSpec();
+ } else {
+ String msg = "Unsupported key type " + pubKey.getAlgorithm();
+ log.error(msg);
+ throw new PulsarClientException.CryptoException(msg);
+ }
+ if (params != null) {
+ dataKeyCipher.init(Cipher.ENCRYPT_MODE, pubKey, params);
+ } else {
+ dataKeyCipher.init(Cipher.ENCRYPT_MODE, pubKey);
+ }
+ return dataKeyCipher;
+ }
+
+ public static byte[] compressionIfNeeded(CompressionType compressionType,
byte[] payload) {
+ if (compressionType != null &&
!CompressionType.NONE.equals(compressionType)) {
+ CompressionCodec codec =
CompressionCodecProvider.getCompressionCodec(compressionType);
+ ByteBuf input =
PulsarByteBufAllocator.DEFAULT.buffer(payload.length, payload.length);
+ input.writeBytes(payload);
+ ByteBuf output = codec.encode(input);
+ input.release();
+ byte[] res = new byte[output.readableBytes()];
+ output.readBytes(res);
+ output.release();
+ return res;
+ }
+ return payload;
+ }
+
+ public static EncryptedPayloadAndParam encryptPayload(CryptoKeyReader
cryptoKeyReader, MessageCryptoBc msgCrypto,
+ byte[] payload,
String keyName)
+ throws PulsarClientException {
+ ByteBuffer unEncryptedMessagePayload = ByteBuffer.wrap(payload);
+ ByteBuffer encryptedMessagePayload =
ByteBuffer.allocate(unEncryptedMessagePayload.remaining() + 512);
+ MessageMetadata ignoredMessageMetadata = new MessageMetadata();
+ msgCrypto.encrypt(Collections.singleton(keyName), cryptoKeyReader,
+ () -> ignoredMessageMetadata, unEncryptedMessagePayload,
encryptedMessagePayload);
+ byte[] res = new byte[encryptedMessagePayload.remaining()];
+ encryptedMessagePayload.get(res);
+ return new
EncryptedPayloadAndParam(WssClientSideEncryptUtils.base64Encode(res),
+
WssClientSideEncryptUtils.base64Encode(ignoredMessageMetadata.getEncryptionParam()));
+ }
+
+ @AllArgsConstructor
+ public static class EncryptedPayloadAndParam {
+ public final String encryptedPayload;
+ public final String encryptionParam;
+ }
+
+ public static byte[] decryptMsgPayload(String payloadString,
EncryptionContext encryptionContext,
+ CryptoKeyReader cryptoKeyReader,
MessageCryptoBc msgCrypto) {
+ byte[] payload = base64Decode(payloadString);
+ if (encryptionContext == null) {
+ return payload;
+ }
+
+ MessageMetadata messageMetadata = new MessageMetadata();
+ Map<String, EncryptionContext.EncryptionKey> encKeys =
encryptionContext.getKeys();
+ for (Map.Entry<String, EncryptionContext.EncryptionKey> entry :
encKeys.entrySet()) {
+ EncryptionKeys encryptionKeys = messageMetadata.addEncryptionKey()
+
.setKey(entry.getKey()).setValue(entry.getValue().getKeyValue());
+ if (entry.getValue().getMetadata() != null) {
+ for (Map.Entry<String, String> prop :
entry.getValue().getMetadata().entrySet()) {
+
encryptionKeys.addMetadata().setKey(prop.getKey()).setValue(prop.getValue());
+ }
+ }
+ }
+ messageMetadata.setEncryptionParam(encryptionContext.getParam());
+
+ // Create input and output.
+ ByteBuffer input = ByteBuffer.allocate(payload.length);
+ ByteBuffer output =
ByteBuffer.allocate(msgCrypto.getMaxOutputSize(payload.length));
+ input.put(payload);
+ input.flip();
+
+ // Decrypt.
+ msgCrypto.decrypt(() -> messageMetadata, input, output,
cryptoKeyReader);
+ byte[] res = new byte[output.limit()];
+ output.get(res);
+ return res;
+ }
+
+ public static byte[] unCompressionIfNeeded(byte[] payloadBytes,
EncryptionContext encryptionContext) throws IOException {
+ if (encryptionContext.getCompressionType() != null &&
!org.apache.pulsar.client.api.CompressionType.NONE
+ .equals(encryptionContext.getCompressionType())) {
+ CompressionCodec codec =
+
CompressionCodecProvider.getCompressionCodec(encryptionContext.getCompressionType());
+ ByteBuf input =
PulsarByteBufAllocator.DEFAULT.buffer(payloadBytes.length, payloadBytes.length);
+ input.writeBytes(payloadBytes);
+ ByteBuf output = codec.decode(input,
encryptionContext.getUncompressedMessageSize());
+ input.release();
+ byte[] res = new byte[output.readableBytes()];
+ output.readBytes(res);
+ output.release();
+ return res;
+ }
+ return payloadBytes;
+ }
+
+ /**
+ * Note: this method does not parse the message in its entirety; it only
parses the payload of the message.
+ */
+ public static List<ConsumerMessage> extractBatchMessagesIfNeeded(byte[]
payloadBytes,
+
EncryptionContext encryptionContext) throws IOException {
+ ByteBuf payload =
PulsarByteBufAllocator.DEFAULT.buffer(payloadBytes.length);
+ payload.writeBytes(payloadBytes);
+ if (encryptionContext.getBatchSize().isPresent()) {
+ List<ConsumerMessage> res = new ArrayList<>();
+ int batchSize = encryptionContext.getBatchSize().get();
+ for (int i = 0; i < batchSize; i++) {
+ ConsumerMessage msg = new ConsumerMessage();
+ SingleMessageMetadata singleMsgMetadata = new
SingleMessageMetadata();
+ ByteBuf singleMsgPayload =
Commands.deSerializeSingleMessageInBatch(payload, singleMsgMetadata, i,
+ batchSize);
+ if (singleMsgMetadata.getPayloadSize() < 1) {
+ msg.payload = null;
+ } else {
+ byte[] bs = new byte[singleMsgPayload.readableBytes()];
+ singleMsgPayload.readBytes(bs);
+ msg.payload = new String(bs, UTF8);
+ }
+ res.add(msg);
+ }
+ return res;
+ }
+ ConsumerMessage msg = new ConsumerMessage();
+ msg.payload = new String(payloadBytes, UTF8);
+ return Collections.singletonList(msg);
+ }
+}
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/DummyCryptoKeyReaderImpl.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/DummyCryptoKeyReaderImpl.java
new file mode 100644
index 00000000000..df9392db2de
--- /dev/null
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/DummyCryptoKeyReaderImpl.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.util.Map;
+
+/**
+ * An empty implement. Doesn't provide any public key or private key, and just
returns `null`.
+ */
+public class DummyCryptoKeyReaderImpl implements CryptoKeyReader {
+
+ public static final DummyCryptoKeyReaderImpl INSTANCE = new
DummyCryptoKeyReaderImpl();
+
+ private DummyCryptoKeyReaderImpl(){}
+
+ @Override
+ public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String>
metadata) {
+ return null;
+ }
+
+ @Override
+ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String>
metadata) {
+ return null;
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-client-messagecrypto-bc/src/main/java/org/apache/pulsar/client/impl/crypto/MessageCryptoBc.java
b/pulsar-client-messagecrypto-bc/src/main/java/org/apache/pulsar/client/impl/crypto/MessageCryptoBc.java
index 146f066ae2c..5778b0701a4 100644
---
a/pulsar-client-messagecrypto-bc/src/main/java/org/apache/pulsar/client/impl/crypto/MessageCryptoBc.java
+++
b/pulsar-client-messagecrypto-bc/src/main/java/org/apache/pulsar/client/impl/crypto/MessageCryptoBc.java
@@ -53,6 +53,7 @@ import javax.crypto.SecretKey;
import javax.crypto.ShortBufferException;
import javax.crypto.spec.GCMParameterSpec;
import javax.crypto.spec.SecretKeySpec;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.EncryptionKeyInfo;
@@ -83,14 +84,14 @@ import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter;
@Slf4j
public class MessageCryptoBc implements MessageCrypto<MessageMetadata,
MessageMetadata> {
- private static final String ECDSA = "ECDSA";
- private static final String RSA = "RSA";
- private static final String ECIES = "ECIES";
+ public static final String ECDSA = "ECDSA";
+ public static final String RSA = "RSA";
+ public static final String ECIES = "ECIES";
// Ideally the transformation should also be part of the message property.
This will prevent client
// from assuming hardcoded value. However, it will increase the size of
the message even further.
- private static final String RSA_TRANS =
"RSA/NONE/OAEPWithSHA1AndMGF1Padding";
- private static final String AESGCM = "AES/GCM/NoPadding";
+ public static final String RSA_TRANS =
"RSA/NONE/OAEPWithSHA1AndMGF1Padding";
+ public static final String AESGCM = "AES/GCM/NoPadding";
private static KeyGenerator keyGenerator;
private static final int tagLen = 16 * 8;
@@ -100,6 +101,7 @@ public class MessageCryptoBc implements
MessageCrypto<MessageMetadata, MessageMe
private String logCtx;
// Data key which is used to encrypt message
+ @Getter
private SecretKey dataKey;
private LoadingCache<ByteBuffer, SecretKey> dataKeyCache;
@@ -177,7 +179,7 @@ public class MessageCryptoBc implements
MessageCrypto<MessageMetadata, MessageMe
}
- private PublicKey loadPublicKey(byte[] keyBytes) throws Exception {
+ public static PublicKey loadPublicKey(byte[] keyBytes) throws Exception {
Reader keyReader = new StringReader(new String(keyBytes));
PublicKey publicKey = null;
@@ -354,7 +356,7 @@ public class MessageCryptoBc implements
MessageCrypto<MessageMetadata, MessageMe
}
// required since Bouncycastle 1.72 when using ECIES, it is required to
pass in an IESParameterSpec
- private IESParameterSpec createIESParameterSpec() {
+ public static IESParameterSpec createIESParameterSpec() {
// the IESParameterSpec to use was discovered by debugging
BouncyCastle 1.69 and running the
// test
org.apache.pulsar.client.api.SimpleProducerConsumerTest#testCryptoWithChunking
return new IESParameterSpec(null, null, 128);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 551950ebc58..ed2970d5e41 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1739,7 +1739,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
if (conf.getCryptoKeyReader() == null) {
switch (conf.getCryptoFailureAction()) {
case CONSUME:
- log.warn("[{}][{}][{}] CryptoKeyReader interface is not
implemented. Consuming encrypted message.",
+ log.debug("[{}][{}][{}] CryptoKeyReader interface is not
implemented. Consuming encrypted message.",
topic, subscription, consumerName);
return payload.retain();
case DISCARD:
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 267b06649d7..9bd1421d035 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -586,11 +586,14 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
msgMetadata.setProducerName(producerName);
- if (conf.getCompressionType() != CompressionType.NONE) {
- msgMetadata
-
.setCompression(CompressionCodecProvider.convertToWireProtocol(conf.getCompressionType()));
+ // The field "uncompressedSize" is zero means the compression info
were not set yet.
+ if (msgMetadata.getUncompressedSize() <= 0) {
+ if (conf.getCompressionType() != CompressionType.NONE) {
+ msgMetadata
+
.setCompression(CompressionCodecProvider.convertToWireProtocol(conf.getCompressionType()));
+ }
+ msgMetadata.setUncompressedSize(uncompressedSize);
}
- msgMetadata.setUncompressedSize(uncompressedSize);
}
}
diff --git
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
index 2ab62b10ee9..08a23eebdae 100644
---
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
+++
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
@@ -465,6 +465,8 @@ public class ConsumerHandler extends
AbstractWebSocketHandler {
if (service.getCryptoKeyReader().isPresent()) {
builder.cryptoKeyReader(service.getCryptoKeyReader().get());
+ } else {
+ // If users want to decrypt messages themselves, they should set
"cryptoFailureAction" to "CONSUME".
}
return builder;
}
diff --git
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
index 5ad1283fe84..6a3f77ed037 100644
---
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
+++
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
@@ -21,10 +21,12 @@ package org.apache.pulsar.websocket;
import static com.google.common.base.Preconditions.checkArgument;
import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
import static
org.apache.pulsar.websocket.WebSocketError.FailedToDeserializeFromJSON;
import static org.apache.pulsar.websocket.WebSocketError.PayloadEncodingError;
import static org.apache.pulsar.websocket.WebSocketError.UnknownError;
import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectReader;
import com.google.common.base.Enums;
import java.io.IOException;
@@ -33,25 +35,31 @@ import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
+import java.util.stream.Collectors;
import javax.servlet.http.HttpServletRequest;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.DummyCryptoKeyReaderImpl;
import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SchemaSerializationException;
-import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
+import org.apache.pulsar.common.api.proto.KeyValue;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.websocket.data.ProducerAck;
import org.apache.pulsar.websocket.data.ProducerMessage;
+import org.apache.pulsar.websocket.service.WSSDummyMessageCryptoImpl;
import org.apache.pulsar.websocket.stats.StatsBuckets;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
@@ -76,6 +84,7 @@ public class ProducerHandler extends AbstractWebSocketHandler
{
private final LongAdder numBytesSent;
private final StatsBuckets publishLatencyStatsUSec;
private volatile long msgPublishedCounter = 0;
+ private boolean clientSideEncrypt;
private static final AtomicLongFieldUpdater<ProducerHandler>
MSG_PUBLISHED_COUNTER_UPDATER =
AtomicLongFieldUpdater.newUpdater(ProducerHandler.class,
"msgPublishedCounter");
@@ -98,6 +107,12 @@ public class ProducerHandler extends
AbstractWebSocketHandler {
try {
this.producer =
getProducerBuilder(service.getPulsarClient()).topic(topic.toString()).create();
+ if (clientSideEncrypt) {
+ log.info("[{}] [{}] The producer session is created with param
encryptionKeyValues, which means that"
+ + " message encryption will be done on the
client side, then the server will skip "
+ + "batch message processing, message
compression processing, and message encryption"
+ + " processing", producer.getTopic(),
producer.getProducerName());
+ }
if (!this.service.addProducer(this)) {
log.warn("[{}:{}] Failed to add producer handler for topic
{}", request.getRemoteAddr(),
request.getRemotePort(), topic);
@@ -159,7 +174,7 @@ public class ProducerHandler extends
AbstractWebSocketHandler {
}
final long msgSize = rawPayload.length;
- TypedMessageBuilder<byte[]> builder = producer.newMessage();
+ TypedMessageBuilderImpl<byte[]> builder =
(TypedMessageBuilderImpl<byte[]>) producer.newMessage();
try {
builder.value(rawPayload);
@@ -192,6 +207,37 @@ public class ProducerHandler extends
AbstractWebSocketHandler {
builder.deliverAfter(sendRequest.deliverAfterMs,
TimeUnit.MILLISECONDS);
}
+ // If client-side encryption is enabled, the attributes
"encryptParam", "uncompressedMessageSize",
+ // "uncompressedMessageSize" and "batchSize" of message metadata must
be set according to the parameters
+ // when the client sends messages.
+ if (clientSideEncrypt) {
+ try {
+ if (!StringUtils.isBlank(sendRequest.encryptionParam)) {
+
builder.getMetadataBuilder().setEncryptionParam(Base64.getDecoder()
+ .decode(sendRequest.encryptionParam));
+ }
+ } catch (Exception e){
+ String msg = format("Invalid Base64 encryptionParam error=%s",
e.getMessage());
+ sendAckResponse(new ProducerAck(PayloadEncodingError, msg,
null, requestContext));
+ return;
+ }
+ if (sendRequest.compressionType != null &&
sendRequest.uncompressedMessageSize != null) {
+ // Set compression information.
+
builder.getMetadataBuilder().setCompression(sendRequest.compressionType);
+
builder.getMetadataBuilder().setUncompressedSize(sendRequest.uncompressedMessageSize);
+ } else if
((org.apache.pulsar.common.api.proto.CompressionType.NONE.equals(sendRequest.compressionType)
+ || sendRequest.compressionType == null)
+ && sendRequest.uncompressedMessageSize == null) {
+ // Nothing to do, the method send async will set these two
attributes.
+ } else {
+ // Only one param is set.
+ sendAckResponse(new ProducerAck(PayloadEncodingError, "the
params compressionType and"
+ + " uncompressedMessageSize should both empty or both
non-empty",
+ null, requestContext));
+ return;
+ }
+ }
+
final long now = System.nanoTime();
builder.sendAsync().thenAccept(msgId -> {
@@ -205,8 +251,8 @@ public class ProducerHandler extends
AbstractWebSocketHandler {
sendAckResponse(new ProducerAck(messageId,
sendRequest.context));
}
}).exceptionally(exception -> {
- log.warn("[{}] Error occurred while producer handler was sending
msg from {}: {}", producer.getTopic(),
- getRemote().getInetSocketAddress().toString(),
exception.getMessage());
+ log.warn("[{}] Error occurred while producer handler was sending
msg from {}", producer.getTopic(),
+ getRemote().getInetSocketAddress().toString(), exception);
numMsgsFailed.increment();
sendAckResponse(
new ProducerAck(UnknownError, exception.getMessage(),
null, sendRequest.context));
@@ -315,30 +361,127 @@ public class ProducerHandler extends
AbstractWebSocketHandler {
builder.sendTimeout(Integer.parseInt(queryParams.get("sendTimeoutMillis")),
TimeUnit.MILLISECONDS);
}
- if (queryParams.containsKey("batchingEnabled")) {
-
builder.enableBatching(Boolean.parseBoolean(queryParams.get("batchingEnabled")));
+ if (queryParams.containsKey("messageRoutingMode")) {
+ checkArgument(
+ Enums.getIfPresent(MessageRoutingMode.class,
queryParams.get("messageRoutingMode")).isPresent(),
+ "Invalid messageRoutingMode %s",
queryParams.get("messageRoutingMode"));
+ MessageRoutingMode routingMode =
MessageRoutingMode.valueOf(queryParams.get("messageRoutingMode"));
+ if (!MessageRoutingMode.CustomPartition.equals(routingMode)) {
+ builder.messageRoutingMode(routingMode);
+ }
}
- if (queryParams.containsKey("batchingMaxMessages")) {
-
builder.batchingMaxMessages(Integer.parseInt(queryParams.get("batchingMaxMessages")));
+ Map<String, EncryptionKey> encryptionKeyMap =
tryToExtractJsonEncryptionKeys();
+ if (encryptionKeyMap != null) {
+ popularProducerBuilderForClientSideEncrypt(builder,
encryptionKeyMap);
+ } else {
+ popularProducerBuilderForServerSideEncrypt(builder);
}
+ return builder;
+ }
- if (queryParams.containsKey("maxPendingMessages")) {
-
builder.maxPendingMessages(Integer.parseInt(queryParams.get("maxPendingMessages")));
+ private Map<String, EncryptionKey> tryToExtractJsonEncryptionKeys() {
+ if (!queryParams.containsKey("encryptionKeys")) {
+ return null;
+ }
+ // Base64 decode.
+ byte[] param = null;
+ try {
+ param =
Base64.getDecoder().decode(StringUtils.trim(queryParams.get("encryptionKeys")));
+ } catch (Exception base64DecodeEx) {
+ return null;
+ }
+ try {
+ Map<String, EncryptionKey> keys =
ObjectMapperFactory.getMapper().getObjectMapper()
+ .readValue(param, new TypeReference<Map<String,
EncryptionKey>>() {});
+ if (keys.isEmpty()) {
+ return null;
+ }
+ if (keys.values().iterator().next().getKeyValue() == null) {
+ return null;
+ }
+ return keys;
+ } catch (IOException ex) {
+ return null;
}
+ }
- if (queryParams.containsKey("batchingMaxPublishDelay")) {
-
builder.batchingMaxPublishDelay(Integer.parseInt(queryParams.get("batchingMaxPublishDelay")),
- TimeUnit.MILLISECONDS);
+ private void
popularProducerBuilderForClientSideEncrypt(ProducerBuilder<byte[]> builder,
+ Map<String,
EncryptionKey> encryptionKeyMap) {
+ this.clientSideEncrypt = true;
+ int keysLen = encryptionKeyMap.size();
+ final String[] keyNameArray = new String[keysLen];
+ final byte[][] keyValueArray = new byte[keysLen][];
+ final List<KeyValue>[] keyMetadataArray = new List[keysLen];
+ // Format keys.
+ int index = 0;
+ for (Map.Entry<String, EncryptionKey> entry :
encryptionKeyMap.entrySet()) {
+ checkArgument(StringUtils.isNotBlank(entry.getKey()), "Empty param
encryptionKeys.key");
+ checkArgument(entry.getValue() != null, "Empty param
encryptionKeys.value");
+ checkArgument(entry.getValue().getKeyValue() != null, "Empty param
encryptionKeys.value.keyValue");
+ keyNameArray[index] = StringUtils.trim(entry.getKey());
+ keyValueArray[index] = entry.getValue().getKeyValue();
+ if (entry.getValue().getMetadata() == null) {
+ keyMetadataArray[index] = Collections.emptyList();
+ } else {
+ keyMetadataArray[index] =
entry.getValue().getMetadata().entrySet().stream()
+ .map(e -> new
KeyValue().setKey(e.getKey()).setValue(e.getValue()))
+ .collect(Collectors.toList());
+ }
+ builder.addEncryptionKey(keyNameArray[index]);
}
+ // Background: The order of message payload process during message
sending:
+ // 1. The Producer will composite several message payloads into a
batched message payload if the producer is
+ // enabled batch;
+ // 2. The Producer will compress the batched message payload to a
compressed payload if enabled compression;
+ // 3. After the previous two steps, the Producer encrypts the
compressed payload to an encrypted payload.
+ //
+ // Since the order of producer operation for message payloads is
"compression --> encryption", users need to
+ // handle Compression themselves if needed. We just disable
server-side batch process, server-side compression,
+ // and server-side encryption, and only set the message metadata that.
+ builder.enableBatching(false);
+ // Disable server-side compression, and just set compression
attributes into the message metadata when sending
+ // messages(see the method "onWebSocketText").
+ builder.compressionType(CompressionType.NONE);
+ // Disable server-side encryption, and just set encryption attributes
into the message metadata when sending
+ // messages(see the method "onWebSocketText").
+ builder.cryptoKeyReader(DummyCryptoKeyReaderImpl.INSTANCE);
+ // Set the param `enableChunking` to `false`(the default value is
`false`) to prevent unexpected problems if
+ // the default setting is changed in the future.
+ builder.enableChunking(false);
+ // Inject encryption metadata decorator.
+ builder.messageCrypto(new WSSDummyMessageCryptoImpl(msgMetadata -> {
+ for (int i = 0; i < keyNameArray.length; i++) {
+
msgMetadata.addEncryptionKey().setKey(keyNameArray[i]).setValue(keyValueArray[i])
+ .addAllMetadatas(keyMetadataArray[i]);
+ }
+ }));
+ // Do warning param check and print warning log.
+ printLogIfSettingDiscardedBatchedParams();
+ printLogIfSettingDiscardedCompressionParams();
+ }
- if (queryParams.containsKey("messageRoutingMode")) {
- checkArgument(
- Enums.getIfPresent(MessageRoutingMode.class,
queryParams.get("messageRoutingMode")).isPresent(),
- "Invalid messageRoutingMode %s",
queryParams.get("messageRoutingMode"));
- MessageRoutingMode routingMode =
MessageRoutingMode.valueOf(queryParams.get("messageRoutingMode"));
- if (!MessageRoutingMode.CustomPartition.equals(routingMode)) {
- builder.messageRoutingMode(routingMode);
+ private void
popularProducerBuilderForServerSideEncrypt(ProducerBuilder<byte[]> builder) {
+ this.clientSideEncrypt = false;
+ if (queryParams.containsKey("batchingEnabled")) {
+ boolean batchingEnabled =
Boolean.parseBoolean(queryParams.get("batchingEnabled"));
+ if (batchingEnabled) {
+ builder.enableBatching(true);
+ if (queryParams.containsKey("batchingMaxMessages")) {
+
builder.batchingMaxMessages(Integer.parseInt(queryParams.get("batchingMaxMessages")));
+ }
+
+ if (queryParams.containsKey("maxPendingMessages")) {
+
builder.maxPendingMessages(Integer.parseInt(queryParams.get("maxPendingMessages")));
+ }
+
+ if (queryParams.containsKey("batchingMaxPublishDelay")) {
+
builder.batchingMaxPublishDelay(Integer.parseInt(queryParams.get("batchingMaxPublishDelay")),
+ TimeUnit.MILLISECONDS);
+ }
+ } else {
+ builder.enableBatching(false);
+ printLogIfSettingDiscardedBatchedParams();
}
}
@@ -356,7 +499,27 @@ public class ProducerHandler extends
AbstractWebSocketHandler {
builder.addEncryptionKey(key);
}
}
- return builder;
+ }
+
+ private void printLogIfSettingDiscardedBatchedParams() {
+ if (clientSideEncrypt && queryParams.containsKey("batchingEnabled")) {
+ log.info("Since clientSideEncrypt is true, the param
batchingEnabled of producer will be ignored");
+ }
+ if (queryParams.containsKey("batchingMaxMessages")) {
+ log.info("Since batchingEnabled is false, the param
batchingMaxMessages of producer will be ignored");
+ }
+ if (queryParams.containsKey("maxPendingMessages")) {
+ log.info("Since batchingEnabled is false, the param
maxPendingMessages of producer will be ignored");
+ }
+ if (queryParams.containsKey("batchingMaxPublishDelay")) {
+ log.info("Since batchingEnabled is false, the param
batchingMaxPublishDelay of producer will be ignored");
+ }
+ }
+
+ private void printLogIfSettingDiscardedCompressionParams() {
+ if (clientSideEncrypt && queryParams.containsKey("compressionType")) {
+ log.info("Since clientSideEncrypt is true, the param
compressionType of producer will be ignored");
+ }
}
private static final Logger log =
LoggerFactory.getLogger(ProducerHandler.class);
diff --git
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ProducerMessage.java
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ProducerMessage.java
index 4831b905514..12cb3b20c19 100644
---
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ProducerMessage.java
+++
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ProducerMessage.java
@@ -23,6 +23,7 @@ import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
+import org.apache.pulsar.common.api.proto.CompressionType;
/**
* Class represent single message to be published.
@@ -69,4 +70,13 @@ public class ProducerMessage {
// Base64 encoded serialized schema for payload
public String valueSchema;
+
+ // Base64 encoded serialized initialization vector used when the client
encrypts.
+ public String encryptionParam;
+
+ // Compression type. Do not set it if compression is not performed.
+ public CompressionType compressionType;
+
+ // The size of the payload before compression. Do not set it if
compression is not performed.
+ public Integer uncompressedMessageSize;
}
diff --git
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WSSDummyMessageCryptoImpl.java
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WSSDummyMessageCryptoImpl.java
new file mode 100644
index 00000000000..43f7a368bbb
--- /dev/null
+++
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WSSDummyMessageCryptoImpl.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.websocket.service;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.MessageCrypto;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+
+/***
+ * This class is used in scenarios where the payload of the message has been
encrypted and the producer does not need
+ * to encrypt it again.
+ * It discards payload encryption and only relies {@link
#metadataModifierForSend} to set the encryption info into the
+ * message metadata.
+ */
+public class WSSDummyMessageCryptoImpl implements
MessageCrypto<MessageMetadata, MessageMetadata> {
+
+ public static final WSSDummyMessageCryptoImpl INSTANCE_FOR_CONSUMER =
+ new WSSDummyMessageCryptoImpl(msgMetadata -> {});
+
+ private final Consumer<MessageMetadata> metadataModifierForSend;
+
+ public WSSDummyMessageCryptoImpl(Consumer<MessageMetadata>
metadataModifierForSend) {
+ this.metadataModifierForSend = metadataModifierForSend;
+ }
+
+ @Override
+ public void addPublicKeyCipher(Set keyNames, CryptoKeyReader keyReader)
+ throws PulsarClientException.CryptoException {}
+
+ @Override
+ public boolean removeKeyCipher(String keyName) {
+ return true;
+ }
+
+ @Override
+ public int getMaxOutputSize(int inputLen) {
+ return inputLen;
+ }
+
+ @Override
+ public boolean decrypt(Supplier<MessageMetadata> messageMetadataSupplier,
ByteBuffer payload, ByteBuffer outBuffer,
+ CryptoKeyReader keyReader) {
+ outBuffer.put(payload);
+ outBuffer.flip();
+ return true;
+ }
+
+ @Override
+ public synchronized void encrypt(Set<String> encKeys, CryptoKeyReader
keyReader,
+ Supplier<MessageMetadata>
messageMetadataSupplier,
+ ByteBuffer payload, ByteBuffer outBuffer)
throws PulsarClientException {
+ outBuffer.put(payload);
+ outBuffer.flip();
+ metadataModifierForSend.accept(messageMetadataSupplier.get());
+ }
+}
diff --git
a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/ProducerHandlerTest.java
b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/ProducerHandlerTest.java
index 5f773a8e2e1..d09b5941f24 100644
---
a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/ProducerHandlerTest.java
+++
b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/ProducerHandlerTest.java
@@ -23,6 +23,7 @@ import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.websocket.data.ProducerMessage;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
@@ -53,7 +54,7 @@ public class ProducerHandlerTest {
PulsarClient pulsarClient = mock(PulsarClient.class);
ProducerBuilder producerBuilder = mock(ProducerBuilder.class);
Producer producer = mock(Producer.class);
- TypedMessageBuilder messageBuilder = mock(TypedMessageBuilder.class);
+ TypedMessageBuilder messageBuilder =
mock(TypedMessageBuilderImpl.class);
ProducerMessage produceRequest = new ProducerMessage();
produceRequest.setDeliverAfterMs(11111);