This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3a59e4c391d [fix][client] Fix client to handle permits for discarded
and failed decrypt batch-message (#23068)
3a59e4c391d is described below
commit 3a59e4c391d18637438c70bd2dd9fb030f715bf7
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Fri Aug 30 11:24:33 2024 -0700
[fix][client] Fix client to handle permits for discarded and failed decrypt
batch-message (#23068)
---
.../client/api/SimpleProducerConsumerTest.java | 86 ++++++++++++++++++++
.../apache/pulsar/client/impl/ConsumerImpl.java | 94 ++++++++++++----------
2 files changed, 139 insertions(+), 41 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 61dd33be64a..2e71e8cc28c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -48,6 +48,7 @@ import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Clock;
+import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
@@ -116,6 +117,8 @@ import
org.apache.pulsar.common.api.proto.SingleMessageMetadata;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.naming.TopicName;
+import
org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.CursorStats;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.protocol.Commands;
@@ -4862,6 +4865,89 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
assertEquals(consumer.batchReceive().size(), maxBatchSize);
}
+ /**
+ *
+ * This test validates that consumer correctly sends permits for batch
message that should be discarded.
+ * @throws Exception
+ */
+ @Test
+ public void testEncryptionFailureWithBatchPublish() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+ String topicName = "persistent://my-property/my-ns/batchFailureTest-" +
System.currentTimeMillis();
+
+ class EncKeyReader implements CryptoKeyReader {
+
+ final EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+ @Override
+ public EncryptionKeyInfo getPublicKey(String keyName, Map<String,
String> keyMeta) {
+ String CERT_FILE_PATH =
"./src/test/resources/certificate/public-key." + keyName;
+ if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+ try {
+
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+ return keyInfo;
+ } catch (IOException e) {
+ Assert.fail("Failed to read certificate from " +
CERT_FILE_PATH);
+ }
+ } else {
+ Assert.fail("Certificate file " + CERT_FILE_PATH + " is not
present or not readable.");
+ }
+ return null;
+ }
+
+ @Override
+ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String,
String> keyMeta) {
+ String CERT_FILE_PATH =
"./src/test/resources/certificate/private-key." + keyName;
+ if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+ try {
+
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+ return keyInfo;
+ } catch (IOException e) {
+ Assert.fail("Failed to read certificate from " +
CERT_FILE_PATH);
+ }
+ } else {
+ Assert.fail("Certificate file " + CERT_FILE_PATH + " is not
present or not readable.");
+ }
+ return null;
+ }
+ }
+
+ final int totalMsg = 2000;
+
+ String subName = "without-cryptoreader";
+ @Cleanup
+ Consumer<byte[]> normalConsumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
+ .messageListener((c, msg) -> {
+ log.info("Failed to consume message {}",
msg.getMessageId());
+ c.acknowledgeAsync(msg);
+
}).cryptoFailureAction(ConsumerCryptoFailureAction.DISCARD).ackTimeout(1,
TimeUnit.SECONDS)
+ .receiverQueueSize(totalMsg / 20).subscribe();
+
+ @Cleanup
+ Producer<byte[]> cryptoProducer =
pulsarClient.newProducer().topic(topicName)
+
.addEncryptionKey("client-ecdsa.pem").enableBatching(true).batchingMaxMessages(5)
+ .batchingMaxPublishDelay(1,
TimeUnit.SECONDS).cryptoKeyReader(new EncKeyReader()).create();
+ for (int i = 0; i < totalMsg; i++) {
+ String message = "my-message-" + i;
+ cryptoProducer.sendAsync(message.getBytes());
+ }
+ cryptoProducer.flush();
+
+ Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
+ PersistentTopicInternalStats internalStats =
admin.topics().getInternalStats(topicName);
+ CursorStats stats = internalStats.cursors.get(subName);
+ String readPosition = stats.readPosition;
+ assertEquals(getMessageId(readPosition, 0, 1),
(getMessageId(internalStats.lastConfirmedEntry, 0, 0)));
+ });
+
+ log.info("-- Exiting {} test --", methodName);
+ }
+
+ private MessageId getMessageId(String messageId, long subLedgerId, long
subEntryId) {
+ String[] ids = messageId.split(":");
+ return new MessageIdImpl(Long.parseLong(ids[0]) - subLedgerId,
Long.parseLong(ids[1]) - subEntryId, -1);
+ }
+
private int compareMessageIds(MessageIdImpl messageId1, MessageIdImpl
messageId2) {
if (messageId2.getLedgerId() < messageId1.getLedgerId()) {
return -1;
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 596e65484d1..4f041772af3 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1892,30 +1892,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
if (msgMetadata.getEncryptionKeysCount() == 0) {
return payload.retain();
}
-
+ int batchSize = msgMetadata.getNumMessagesInBatch();
// If KeyReader is not configured throw exception based on config param
if (conf.getCryptoKeyReader() == null) {
- switch (conf.getCryptoFailureAction()) {
- case CONSUME:
- log.debug("[{}][{}][{}] CryptoKeyReader interface is not
implemented. Consuming encrypted message.",
- topic, subscription, consumerName);
- return payload.retain();
- case DISCARD:
- log.warn(
- "[{}][{}][{}] Skipping decryption since
CryptoKeyReader interface is not implemented and"
- + " config is set to discard",
- topic, subscription, consumerName);
- discardMessage(messageId, currentCnx,
ValidationError.DecryptionError);
- return null;
- case FAIL:
- MessageId m = new MessageIdImpl(messageId.getLedgerId(),
messageId.getEntryId(), partitionIndex);
- log.error(
- "[{}][{}][{}][{}] Message delivery failed since
CryptoKeyReader interface is not"
- + " implemented to consume encrypted
message",
- topic, subscription, consumerName, m);
- unAckedMessageTracker.add(m, redeliveryCount);
- return null;
- }
+ return handleCryptoFailure(payload, messageId, currentCnx,
redeliveryCount, batchSize, true);
}
@@ -1929,27 +1909,58 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
decryptedData.release();
+ return handleCryptoFailure(payload, messageId, currentCnx,
redeliveryCount, batchSize, false);
+ }
+
+ private ByteBuf handleCryptoFailure(ByteBuf payload, MessageIdData
messageId, ClientCnx currentCnx,
+ int redeliveryCount, int batchSize, boolean cryptoReaderNotExist) {
+
switch (conf.getCryptoFailureAction()) {
- case CONSUME:
+ case CONSUME:
+ if (cryptoReaderNotExist) {
+ log.warn("[{}][{}][{}] CryptoKeyReader interface is not
implemented. Consuming encrypted message.",
+ topic, subscription, consumerName);
+ } else {
// Note, batch message will fail to consume even if config is
set to consume
log.warn("[{}][{}][{}][{}] Decryption failed. Consuming
encrypted message since config is set to"
- + " consume.",
- topic, subscription, consumerName, messageId);
- return payload.retain();
- case DISCARD:
- log.warn("[{}][{}][{}][{}] Discarding message since decryption
failed and config is set to discard",
- topic, subscription, consumerName, messageId);
- discardMessage(messageId, currentCnx,
ValidationError.DecryptionError);
- return null;
- case FAIL:
- MessageId m = new MessageIdImpl(messageId.getLedgerId(),
messageId.getEntryId(), partitionIndex);
+ + " consume.", topic, subscription, consumerName,
messageId);
+ }
+ return payload.retain();
+ case DISCARD:
+ if (cryptoReaderNotExist) {
+ log.warn(
+ "[{}][{}][{}] Skipping decryption since
CryptoKeyReader interface is not implemented and"
+ + " config is set to discard message with
batch size {}",
+ topic, subscription, consumerName, batchSize);
+ } else {
+ log.warn(
+ "[{}][{}][{}][{}-{}-{}] Discarding message since
decryption failed "
+ + "and config is set to discard",
+ topic, subscription, consumerName,
messageId.getLedgerId(), messageId.getEntryId(),
+ messageId.getBatchIndex());
+ }
+ discardMessage(messageId, currentCnx,
ValidationError.DecryptionError, batchSize);
+ return null;
+ case FAIL:
+ if (cryptoReaderNotExist) {
log.error(
- "[{}][{}][{}][{}] Message delivery failed since unable
to decrypt incoming message",
- topic, subscription, consumerName, m);
- unAckedMessageTracker.add(m, redeliveryCount);
- return null;
+ "[{}][{}][{}][{}-{}-{}] Message delivery failed since
CryptoKeyReader interface is not"
+ + " implemented to consume encrypted message",
+ topic, subscription, consumerName,
messageId.getLedgerId(), messageId.getEntryId(),
+ partitionIndex);
+ } else {
+ log.error("[{}][{}][{}][{}-{}-{}] Message delivery failed
since unable to decrypt incoming message",
+ topic, subscription, consumerName,
messageId.getLedgerId(), messageId.getEntryId(),
+ partitionIndex);
+ }
+ MessageId m = new MessageIdImpl(messageId.getLedgerId(),
messageId.getEntryId(), partitionIndex);
+ unAckedMessageTracker.add(m, redeliveryCount);
+ return null;
+ default:
+ log.warn("[{}][{}][{}] Invalid crypto failure state found,
continue message consumption.", topic,
+ subscription, consumerName);
+ return payload.retain();
}
- return null;
}
private ByteBuf uncompressPayloadIfNeeded(MessageIdData messageId,
MessageMetadata msgMetadata, ByteBuf payload,
@@ -2009,14 +2020,15 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
ValidationError validationError) {
log.error("[{}][{}] Discarding corrupted message at {}:{}", topic,
subscription, messageId.getLedgerId(),
messageId.getEntryId());
- discardMessage(messageId, currentCnx, validationError);
+ discardMessage(messageId, currentCnx, validationError, 1);
}
- private void discardMessage(MessageIdData messageId, ClientCnx currentCnx,
ValidationError validationError) {
+ private void discardMessage(MessageIdData messageId, ClientCnx currentCnx,
ValidationError validationError,
+ int batchMessages) {
ByteBuf cmd = Commands.newAck(consumerId, messageId.getLedgerId(),
messageId.getEntryId(), null,
AckType.Individual, validationError, Collections.emptyMap(),
-1);
currentCnx.ctx().writeAndFlush(cmd, currentCnx.ctx().voidPromise());
- increaseAvailablePermits(currentCnx);
+ increaseAvailablePermits(currentCnx, batchMessages);
stats.incrementNumReceiveFailed();
}