This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a59e4c391d [fix][client] Fix client to handle permits for discarded 
and failed decrypt batch-message (#23068)
3a59e4c391d is described below

commit 3a59e4c391d18637438c70bd2dd9fb030f715bf7
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Fri Aug 30 11:24:33 2024 -0700

    [fix][client] Fix client to handle permits for discarded and failed decrypt 
batch-message (#23068)
---
 .../client/api/SimpleProducerConsumerTest.java     | 86 ++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 94 ++++++++++++----------
 2 files changed, 139 insertions(+), 41 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 61dd33be64a..2e71e8cc28c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -48,6 +48,7 @@ import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.time.Clock;
+import java.time.Duration;
 import java.time.Instant;
 import java.time.ZoneId;
 import java.util.ArrayList;
@@ -116,6 +117,8 @@ import 
org.apache.pulsar.common.api.proto.SingleMessageMetadata;
 import org.apache.pulsar.common.compression.CompressionCodec;
 import org.apache.pulsar.common.compression.CompressionCodecProvider;
 import org.apache.pulsar.common.naming.TopicName;
+import 
org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.CursorStats;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.PublisherStats;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.protocol.Commands;
@@ -4862,6 +4865,89 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         assertEquals(consumer.batchReceive().size(), maxBatchSize);
     }
 
+    /**
+    *
+    * This test validates that consumer correctly sends permits for batch 
message that should be discarded.
+    * @throws Exception
+    */
+   @Test
+   public void testEncryptionFailureWithBatchPublish() throws Exception {
+       log.info("-- Starting {} test --", methodName);
+       String topicName = "persistent://my-property/my-ns/batchFailureTest-" + 
System.currentTimeMillis();
+
+       class EncKeyReader implements CryptoKeyReader {
+
+           final EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+           @Override
+           public EncryptionKeyInfo getPublicKey(String keyName, Map<String, 
String> keyMeta) {
+               String CERT_FILE_PATH = 
"./src/test/resources/certificate/public-key." + keyName;
+               if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                   try {
+                       
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                       return keyInfo;
+                   } catch (IOException e) {
+                       Assert.fail("Failed to read certificate from " + 
CERT_FILE_PATH);
+                   }
+               } else {
+                   Assert.fail("Certificate file " + CERT_FILE_PATH + " is not 
present or not readable.");
+               }
+               return null;
+           }
+
+           @Override
+           public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, 
String> keyMeta) {
+               String CERT_FILE_PATH = 
"./src/test/resources/certificate/private-key." + keyName;
+               if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                   try {
+                       
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                       return keyInfo;
+                   } catch (IOException e) {
+                       Assert.fail("Failed to read certificate from " + 
CERT_FILE_PATH);
+                   }
+               } else {
+                   Assert.fail("Certificate file " + CERT_FILE_PATH + " is not 
present or not readable.");
+               }
+               return null;
+           }
+       }
+
+       final int totalMsg = 2000;
+
+       String subName = "without-cryptoreader";
+       @Cleanup
+       Consumer<byte[]> normalConsumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
+               .messageListener((c, msg) -> {
+                   log.info("Failed to consume message {}", 
msg.getMessageId());
+                   c.acknowledgeAsync(msg);
+               
}).cryptoFailureAction(ConsumerCryptoFailureAction.DISCARD).ackTimeout(1, 
TimeUnit.SECONDS)
+               .receiverQueueSize(totalMsg / 20).subscribe();
+
+       @Cleanup
+       Producer<byte[]> cryptoProducer = 
pulsarClient.newProducer().topic(topicName)
+               
.addEncryptionKey("client-ecdsa.pem").enableBatching(true).batchingMaxMessages(5)
+               .batchingMaxPublishDelay(1, 
TimeUnit.SECONDS).cryptoKeyReader(new EncKeyReader()).create();
+       for (int i = 0; i < totalMsg; i++) {
+           String message = "my-message-" + i;
+           cryptoProducer.sendAsync(message.getBytes());
+       }
+       cryptoProducer.flush();
+
+       Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
+           PersistentTopicInternalStats internalStats = 
admin.topics().getInternalStats(topicName);
+           CursorStats stats = internalStats.cursors.get(subName);
+           String readPosition = stats.readPosition;
+           assertEquals(getMessageId(readPosition, 0, 1), 
(getMessageId(internalStats.lastConfirmedEntry, 0, 0)));
+       });
+
+       log.info("-- Exiting {} test --", methodName);
+   }
+
+   private MessageId getMessageId(String messageId, long subLedgerId, long 
subEntryId) {
+       String[] ids = messageId.split(":");
+       return new MessageIdImpl(Long.parseLong(ids[0]) - subLedgerId, 
Long.parseLong(ids[1]) - subEntryId, -1);
+   }
+
     private int compareMessageIds(MessageIdImpl messageId1, MessageIdImpl 
messageId2) {
         if (messageId2.getLedgerId() < messageId1.getLedgerId()) {
             return -1;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 596e65484d1..4f041772af3 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1892,30 +1892,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         if (msgMetadata.getEncryptionKeysCount() == 0) {
             return payload.retain();
         }
-
+        int batchSize = msgMetadata.getNumMessagesInBatch();
         // If KeyReader is not configured throw exception based on config param
         if (conf.getCryptoKeyReader() == null) {
-            switch (conf.getCryptoFailureAction()) {
-                case CONSUME:
-                    log.debug("[{}][{}][{}] CryptoKeyReader interface is not 
implemented. Consuming encrypted message.",
-                            topic, subscription, consumerName);
-                    return payload.retain();
-                case DISCARD:
-                    log.warn(
-                            "[{}][{}][{}] Skipping decryption since 
CryptoKeyReader interface is not implemented and"
-                                    + " config is set to discard",
-                            topic, subscription, consumerName);
-                    discardMessage(messageId, currentCnx, 
ValidationError.DecryptionError);
-                    return null;
-                case FAIL:
-                    MessageId m = new MessageIdImpl(messageId.getLedgerId(), 
messageId.getEntryId(), partitionIndex);
-                    log.error(
-                            "[{}][{}][{}][{}] Message delivery failed since 
CryptoKeyReader interface is not"
-                                    + " implemented to consume encrypted 
message",
-                            topic, subscription, consumerName, m);
-                    unAckedMessageTracker.add(m, redeliveryCount);
-                    return null;
-            }
+            return handleCryptoFailure(payload, messageId, currentCnx, 
redeliveryCount, batchSize, true);
         }
 
 
@@ -1929,27 +1909,58 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
         decryptedData.release();
 
+        return handleCryptoFailure(payload, messageId, currentCnx, 
redeliveryCount, batchSize, false);
+    }
+
+    private ByteBuf handleCryptoFailure(ByteBuf payload, MessageIdData 
messageId, ClientCnx currentCnx,
+            int redeliveryCount, int batchSize, boolean cryptoReaderNotExist) {
+
         switch (conf.getCryptoFailureAction()) {
-            case CONSUME:
+        case CONSUME:
+            if (cryptoReaderNotExist) {
+                log.warn("[{}][{}][{}] CryptoKeyReader interface is not 
implemented. Consuming encrypted message.",
+                        topic, subscription, consumerName);
+            } else {
                 // Note, batch message will fail to consume even if config is 
set to consume
                 log.warn("[{}][{}][{}][{}] Decryption failed. Consuming 
encrypted message since config is set to"
-                                + " consume.",
-                        topic, subscription, consumerName, messageId);
-                return payload.retain();
-            case DISCARD:
-                log.warn("[{}][{}][{}][{}] Discarding message since decryption 
failed and config is set to discard",
-                        topic, subscription, consumerName, messageId);
-                discardMessage(messageId, currentCnx, 
ValidationError.DecryptionError);
-                return null;
-            case FAIL:
-                MessageId m = new MessageIdImpl(messageId.getLedgerId(), 
messageId.getEntryId(), partitionIndex);
+                        + " consume.", topic, subscription, consumerName, 
messageId);
+            }
+            return payload.retain();
+        case DISCARD:
+            if (cryptoReaderNotExist) {
+                log.warn(
+                        "[{}][{}][{}] Skipping decryption since 
CryptoKeyReader interface is not implemented and"
+                                + " config is set to discard message with 
batch size {}",
+                        topic, subscription, consumerName, batchSize);
+            } else {
+                log.warn(
+                        "[{}][{}][{}][{}-{}-{}] Discarding message since 
decryption failed "
+                                + "and config is set to discard",
+                        topic, subscription, consumerName, 
messageId.getLedgerId(), messageId.getEntryId(),
+                        messageId.getBatchIndex());
+            }
+            discardMessage(messageId, currentCnx, 
ValidationError.DecryptionError, batchSize);
+            return null;
+        case FAIL:
+            if (cryptoReaderNotExist) {
                 log.error(
-                        "[{}][{}][{}][{}] Message delivery failed since unable 
to decrypt incoming message",
-                        topic, subscription, consumerName, m);
-                unAckedMessageTracker.add(m, redeliveryCount);
-                return null;
+                        "[{}][{}][{}][{}-{}-{}] Message delivery failed since 
CryptoKeyReader interface is not"
+                                + " implemented to consume encrypted message",
+                        topic, subscription, consumerName, 
messageId.getLedgerId(), messageId.getEntryId(),
+                        partitionIndex);
+            } else {
+                log.error("[{}][{}][{}][{}-{}-{}] Message delivery failed 
since unable to decrypt incoming message",
+                        topic, subscription, consumerName, 
messageId.getLedgerId(), messageId.getEntryId(),
+                        partitionIndex);
+            }
+            MessageId m = new MessageIdImpl(messageId.getLedgerId(), 
messageId.getEntryId(), partitionIndex);
+            unAckedMessageTracker.add(m, redeliveryCount);
+            return null;
+        default:
+            log.warn("[{}][{}][{}] Invalid crypto failure state found, 
continue message consumption.", topic,
+                    subscription, consumerName);
+            return payload.retain();
         }
-        return null;
     }
 
     private ByteBuf uncompressPayloadIfNeeded(MessageIdData messageId, 
MessageMetadata msgMetadata, ByteBuf payload,
@@ -2009,14 +2020,15 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             ValidationError validationError) {
         log.error("[{}][{}] Discarding corrupted message at {}:{}", topic, 
subscription, messageId.getLedgerId(),
                 messageId.getEntryId());
-        discardMessage(messageId, currentCnx, validationError);
+        discardMessage(messageId, currentCnx, validationError, 1);
     }
 
-    private void discardMessage(MessageIdData messageId, ClientCnx currentCnx, 
ValidationError validationError) {
+    private void discardMessage(MessageIdData messageId, ClientCnx currentCnx, 
ValidationError validationError,
+            int batchMessages) {
         ByteBuf cmd = Commands.newAck(consumerId, messageId.getLedgerId(), 
messageId.getEntryId(), null,
                 AckType.Individual, validationError, Collections.emptyMap(), 
-1);
         currentCnx.ctx().writeAndFlush(cmd, currentCnx.ctx().voidPromise());
-        increaseAvailablePermits(currentCnx);
+        increaseAvailablePermits(currentCnx, batchMessages);
         stats.incrementNumReceiveFailed();
     }
 

Reply via email to