poorbarcode commented on code in PR #20958:
URL: https://github.com/apache/pulsar/pull/20958#discussion_r1301138415


##########
pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java:
##########
@@ -315,30 +351,117 @@ protected ProducerBuilder<byte[]> 
getProducerBuilder(PulsarClient client) {
             
builder.sendTimeout(Integer.parseInt(queryParams.get("sendTimeoutMillis")), 
TimeUnit.MILLISECONDS);
         }
 
-        if (queryParams.containsKey("batchingEnabled")) {
-            
builder.enableBatching(Boolean.parseBoolean(queryParams.get("batchingEnabled")));
+        if (queryParams.containsKey("messageRoutingMode")) {
+            checkArgument(
+                    Enums.getIfPresent(MessageRoutingMode.class, 
queryParams.get("messageRoutingMode")).isPresent(),
+                    "Invalid messageRoutingMode %s", 
queryParams.get("messageRoutingMode"));
+            MessageRoutingMode routingMode = 
MessageRoutingMode.valueOf(queryParams.get("messageRoutingMode"));
+            if (!MessageRoutingMode.CustomPartition.equals(routingMode)) {
+                builder.messageRoutingMode(routingMode);
+            }
         }
 
-        if (queryParams.containsKey("batchingMaxMessages")) {
-            
builder.batchingMaxMessages(Integer.parseInt(queryParams.get("batchingMaxMessages")));
+        Map<String, EncryptionKey> encryptionKeyMap = 
tryToExtractJsonEncryptionKeys();
+        if (encryptionKeyMap != null) {
+            popularProducerBuilderForClientSideEncrypt(builder, 
encryptionKeyMap);
+        } else {
+            popularProducerBuilderForServerSideEncrypt(builder);
         }
+        return builder;
+    }
 
-        if (queryParams.containsKey("maxPendingMessages")) {
-            
builder.maxPendingMessages(Integer.parseInt(queryParams.get("maxPendingMessages")));
+    private Map<String, EncryptionKey> tryToExtractJsonEncryptionKeys() {
+        if (!queryParams.containsKey("encryptionKeys")) {
+            return null;
+        }
+        // Base64 decode.
+        byte[] param = null;
+        try {
+            param = 
Base64.getDecoder().decode(StringUtils.trim(queryParams.get("encryptionKeys")));
+        } catch (Exception base64DecodeEx) {
+            log.error("Could not base64 decode the param encryptionKeys", 
base64DecodeEx);
+            throw new IllegalArgumentException("Could not base64 decode the 
param encryptionKeys");
+        }
+        try {
+            Map<String, EncryptionKey> keys = 
ObjectMapperFactory.getMapper().getObjectMapper()
+                    .readValue(param, new TypeReference<Map<String, 
EncryptionKey>>() {
+                    });
+            if (keys.isEmpty()) {
+                return null;
+            }
+            if (keys.values().iterator().next().getKeyValue() == null) {
+                return null;
+            }
+            return keys;
+        } catch (IOException ex) {
+            return null;
         }
+    }
 
-        if (queryParams.containsKey("batchingMaxPublishDelay")) {
-            
builder.batchingMaxPublishDelay(Integer.parseInt(queryParams.get("batchingMaxPublishDelay")),
-                    TimeUnit.MILLISECONDS);
+    private void 
popularProducerBuilderForClientSideEncrypt(ProducerBuilder<byte[]> builder,
+                                                            Map<String, 
EncryptionKey> encryptionKeyMap) {
+        this.clientSideEncrypt = true;
+        int keysLen = encryptionKeyMap.size();
+        final String[] keyNameArray = new String[keysLen];
+        final byte[][] keyValueArray = new byte[keysLen][];
+        final List<KeyValue>[] keyMetadataArray = new List[keysLen];
+        // Format keys.
+        int index = 0;
+        for (Map.Entry<String, EncryptionKey> entry : 
encryptionKeyMap.entrySet()) {
+            checkArgument(StringUtils.isNotBlank(entry.getKey()), "Empty param 
encryptionKeys.key");
+            checkArgument(entry.getValue() != null, "Empty param 
encryptionKeys.value");
+            checkArgument(entry.getValue().getKeyValue() != null, "Empty param 
encryptionKeys.value.keyValue");
+            keyNameArray[index] = StringUtils.trim(entry.getKey());
+            keyValueArray[index] = entry.getValue().getKeyValue();
+            if (entry.getValue().getMetadata() == null) {
+                keyMetadataArray[index] = Collections.emptyList();
+            } else {
+                keyMetadataArray[index] = 
entry.getValue().getMetadata().entrySet().stream()
+                        .map(e -> new 
KeyValue().setKey(e.getKey()).setValue(e.getValue()))
+                        .collect(Collectors.toList());
+            }
+            builder.addEncryptionKey(keyNameArray[index]);
         }
+        // Disable server-side batch process, compression, and encryption, and 
only set the message metadata that
+        // which client-side provided.
+        builder.enableBatching(false);
+        builder.compressionType(CompressionType.NONE);
+        builder.cryptoKeyReader(DummyCryptoKeyReaderImpl.INSTANCE);
+        // Disable chunking.
+        builder.enableChunking(false);
+        // Inject encryption metadata decorator.
+        builder.messageCrypto(new WSSDummyMessageCryptoImpl(msgMetadata -> {
+            for (int i = 0; i < keyNameArray.length; i++) {
+                
msgMetadata.addEncryptionKey().setKey(keyNameArray[i]).setValue(keyValueArray[i])
+                        .addAllMetadatas(keyMetadataArray[i]);
+            }
+        }));
+        // Do warning param check and print warning log.
+        printWarnLogIfSettingBatchedParams();
+        printWarnLogIfSettingCompressionParams();
+    }
 
-        if (queryParams.containsKey("messageRoutingMode")) {
-            checkArgument(
-                    Enums.getIfPresent(MessageRoutingMode.class, 
queryParams.get("messageRoutingMode")).isPresent(),
-                    "Invalid messageRoutingMode %s", 
queryParams.get("messageRoutingMode"));
-            MessageRoutingMode routingMode = 
MessageRoutingMode.valueOf(queryParams.get("messageRoutingMode"));
-            if (!MessageRoutingMode.CustomPartition.equals(routingMode)) {
-                builder.messageRoutingMode(routingMode);
+    private void 
popularProducerBuilderForServerSideEncrypt(ProducerBuilder<byte[]> builder) {
+        this.clientSideEncrypt = false;
+        if (queryParams.containsKey("batchingEnabled")) {
+            boolean batchingEnabled = 
Boolean.parseBoolean(queryParams.get("batchingEnabled"));
+            if (batchingEnabled) {
+                builder.enableBatching(true);
+                if (queryParams.containsKey("batchingMaxMessages")) {
+                    
builder.batchingMaxMessages(Integer.parseInt(queryParams.get("batchingMaxMessages")));
+                }
+
+                if (queryParams.containsKey("maxPendingMessages")) {
+                    
builder.maxPendingMessages(Integer.parseInt(queryParams.get("maxPendingMessages")));
+                }
+
+                if (queryParams.containsKey("batchingMaxPublishDelay")) {
+                    
builder.batchingMaxPublishDelay(Integer.parseInt(queryParams.get("batchingMaxPublishDelay")),
+                            TimeUnit.MILLISECONDS);
+                }
+            } else {
+                builder.enableBatching(false);
+                printWarnLogIfSettingBatchedParams();

Review Comment:
   No, it is the original design: Web socket proxy will composite multi 
messages into a batched message if users set the param "batchingEnabled" as 
`true`. I just moved this code into the method 
`popularProducerBuilderForServerSideEncrypt()`



##########
pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java:
##########
@@ -315,30 +351,117 @@ protected ProducerBuilder<byte[]> 
getProducerBuilder(PulsarClient client) {
             
builder.sendTimeout(Integer.parseInt(queryParams.get("sendTimeoutMillis")), 
TimeUnit.MILLISECONDS);
         }
 
-        if (queryParams.containsKey("batchingEnabled")) {
-            
builder.enableBatching(Boolean.parseBoolean(queryParams.get("batchingEnabled")));
+        if (queryParams.containsKey("messageRoutingMode")) {
+            checkArgument(
+                    Enums.getIfPresent(MessageRoutingMode.class, 
queryParams.get("messageRoutingMode")).isPresent(),
+                    "Invalid messageRoutingMode %s", 
queryParams.get("messageRoutingMode"));
+            MessageRoutingMode routingMode = 
MessageRoutingMode.valueOf(queryParams.get("messageRoutingMode"));
+            if (!MessageRoutingMode.CustomPartition.equals(routingMode)) {
+                builder.messageRoutingMode(routingMode);
+            }
         }
 
-        if (queryParams.containsKey("batchingMaxMessages")) {
-            
builder.batchingMaxMessages(Integer.parseInt(queryParams.get("batchingMaxMessages")));
+        Map<String, EncryptionKey> encryptionKeyMap = 
tryToExtractJsonEncryptionKeys();
+        if (encryptionKeyMap != null) {
+            popularProducerBuilderForClientSideEncrypt(builder, 
encryptionKeyMap);
+        } else {
+            popularProducerBuilderForServerSideEncrypt(builder);
         }
+        return builder;
+    }
 
-        if (queryParams.containsKey("maxPendingMessages")) {
-            
builder.maxPendingMessages(Integer.parseInt(queryParams.get("maxPendingMessages")));
+    private Map<String, EncryptionKey> tryToExtractJsonEncryptionKeys() {
+        if (!queryParams.containsKey("encryptionKeys")) {
+            return null;
+        }
+        // Base64 decode.
+        byte[] param = null;
+        try {
+            param = 
Base64.getDecoder().decode(StringUtils.trim(queryParams.get("encryptionKeys")));
+        } catch (Exception base64DecodeEx) {
+            log.error("Could not base64 decode the param encryptionKeys", 
base64DecodeEx);
+            throw new IllegalArgumentException("Could not base64 decode the 
param encryptionKeys");
+        }
+        try {
+            Map<String, EncryptionKey> keys = 
ObjectMapperFactory.getMapper().getObjectMapper()
+                    .readValue(param, new TypeReference<Map<String, 
EncryptionKey>>() {
+                    });
+            if (keys.isEmpty()) {
+                return null;
+            }
+            if (keys.values().iterator().next().getKeyValue() == null) {
+                return null;
+            }
+            return keys;
+        } catch (IOException ex) {
+            return null;
         }
+    }
 
-        if (queryParams.containsKey("batchingMaxPublishDelay")) {
-            
builder.batchingMaxPublishDelay(Integer.parseInt(queryParams.get("batchingMaxPublishDelay")),
-                    TimeUnit.MILLISECONDS);
+    private void 
popularProducerBuilderForClientSideEncrypt(ProducerBuilder<byte[]> builder,
+                                                            Map<String, 
EncryptionKey> encryptionKeyMap) {
+        this.clientSideEncrypt = true;
+        int keysLen = encryptionKeyMap.size();
+        final String[] keyNameArray = new String[keysLen];
+        final byte[][] keyValueArray = new byte[keysLen][];
+        final List<KeyValue>[] keyMetadataArray = new List[keysLen];
+        // Format keys.
+        int index = 0;
+        for (Map.Entry<String, EncryptionKey> entry : 
encryptionKeyMap.entrySet()) {
+            checkArgument(StringUtils.isNotBlank(entry.getKey()), "Empty param 
encryptionKeys.key");
+            checkArgument(entry.getValue() != null, "Empty param 
encryptionKeys.value");
+            checkArgument(entry.getValue().getKeyValue() != null, "Empty param 
encryptionKeys.value.keyValue");
+            keyNameArray[index] = StringUtils.trim(entry.getKey());
+            keyValueArray[index] = entry.getValue().getKeyValue();
+            if (entry.getValue().getMetadata() == null) {
+                keyMetadataArray[index] = Collections.emptyList();
+            } else {
+                keyMetadataArray[index] = 
entry.getValue().getMetadata().entrySet().stream()
+                        .map(e -> new 
KeyValue().setKey(e.getKey()).setValue(e.getValue()))
+                        .collect(Collectors.toList());
+            }
+            builder.addEncryptionKey(keyNameArray[index]);
         }
+        // Disable server-side batch process, compression, and encryption, and 
only set the message metadata that
+        // which client-side provided.
+        builder.enableBatching(false);
+        builder.compressionType(CompressionType.NONE);
+        builder.cryptoKeyReader(DummyCryptoKeyReaderImpl.INSTANCE);

Review Comment:
   Done



##########
pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java:
##########
@@ -315,30 +351,117 @@ protected ProducerBuilder<byte[]> 
getProducerBuilder(PulsarClient client) {
             
builder.sendTimeout(Integer.parseInt(queryParams.get("sendTimeoutMillis")), 
TimeUnit.MILLISECONDS);
         }
 
-        if (queryParams.containsKey("batchingEnabled")) {
-            
builder.enableBatching(Boolean.parseBoolean(queryParams.get("batchingEnabled")));
+        if (queryParams.containsKey("messageRoutingMode")) {
+            checkArgument(
+                    Enums.getIfPresent(MessageRoutingMode.class, 
queryParams.get("messageRoutingMode")).isPresent(),
+                    "Invalid messageRoutingMode %s", 
queryParams.get("messageRoutingMode"));
+            MessageRoutingMode routingMode = 
MessageRoutingMode.valueOf(queryParams.get("messageRoutingMode"));
+            if (!MessageRoutingMode.CustomPartition.equals(routingMode)) {
+                builder.messageRoutingMode(routingMode);
+            }

Review Comment:
   Ah, it is the [original design](https://github.com/apache/pulsar/pull/862), 
I've not changed it.
   
   See: https://github.com/apache/pulsar/pull/862



##########
pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ConsumerMessage.java:
##########
@@ -35,4 +35,5 @@ public class ConsumerMessage {
     public EncryptionContext encryptionContext;
 
     public String key;
+    // TODO support ackSet in the future.

Review Comment:
   it was removed



##########
pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ProducerMessage.java:
##########
@@ -69,4 +70,13 @@ public class ProducerMessage {
 
     // Base64 encoded serialized schema for payload
     public String valueSchema;
+
+    // Compression type. Do not set it if compression is not performed.
+    public String encryptionParam;
+
+    // The size of the payload before compression. Do not set it if 
compression is not performed.

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to