This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 5e8a2138fc0 [fix][client] Fix exception when calling loadConf on a 
ConsumerBuilder that has a KeySharedPolicy (#18345)
5e8a2138fc0 is described below

commit 5e8a2138fc0647ee585ef4e6e0dd6f311349eaae
Author: Christophe Bornet <[email protected]>
AuthorDate: Wed Nov 16 01:45:24 2022 +0100

    [fix][client] Fix exception when calling loadConf on a ConsumerBuilder that 
has a KeySharedPolicy (#18345)
    
    (cherry picked from commit 9c2ec5e218b4743430d654c84f7e26f663b126f1)
---
 .../impl/conf/ConsumerConfigurationData.java       |   1 +
 .../client/impl/ConsumerBuilderImplTest.java       | 232 +++++++++++++++++++++
 2 files changed, 233 insertions(+)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index cf1f60d8e93..ab2527b4932 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -149,6 +149,7 @@ public class ConsumerConfigurationData<T> implements 
Serializable, Cloneable {
 
     private boolean resetIncludeHead = false;
 
+    @JsonIgnore
     private transient KeySharedPolicy keySharedPolicy;
 
     private boolean batchIndexAckEnabled = false;
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
index a04505ddee1..ad390c5d132 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
@@ -18,12 +18,19 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -31,15 +38,32 @@ import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 import org.apache.pulsar.client.api.BatchReceivePolicy;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.ConsumerEventListener;
+import org.apache.pulsar.client.api.CryptoKeyReader;
 import org.apache.pulsar.client.api.DeadLetterPolicy;
+import org.apache.pulsar.client.api.KeySharedPolicy;
+import org.apache.pulsar.client.api.MessageCrypto;
+import org.apache.pulsar.client.api.MessageListener;
+import org.apache.pulsar.client.api.MessagePayloadProcessor;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.RedeliveryBackoff;
+import org.apache.pulsar.client.api.RegexSubscriptionMode;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionMode;
+import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
 /**
  * Unit tests of {@link ConsumerBuilderImpl}.
  */
@@ -331,4 +355,212 @@ public class ConsumerBuilderImplTest {
         consumerBuilderImpl.startPaused(true);
         verify(consumerBuilderImpl.getConf()).setStartPaused(true);
     }
+    @Test
+    public void testLoadConf() throws Exception {
+        ConsumerBuilderImpl<byte[]> consumerBuilder = createConsumerBuilder();
+
+        String jsonConf = ("{\n"
+            + "    'topicNames' : [ 'new-topic' ],\n"
+            + "    'topicsPattern' : 'new-topics-pattern',\n"
+            + "    'subscriptionName' : 'new-subscription',\n"
+            + "    'subscriptionType' : 'Key_Shared',\n"
+            + "    'subscriptionProperties' : {\n"
+            + "      'new-sub-prop' : 'new-sub-prop-value'\n"
+            + "    },\n"
+            + "    'subscriptionMode' : 'NonDurable',\n"
+            + "    'receiverQueueSize' : 2,\n"
+            + "    'acknowledgementsGroupTimeMicros' : 2,\n"
+            + "    'negativeAckRedeliveryDelayMicros' : 2,\n"
+            + "    'maxTotalReceiverQueueSizeAcrossPartitions' : 2,\n"
+            + "    'consumerName' : 'new-consumer',\n"
+            + "    'ackTimeoutMillis' : 2,\n"
+            + "    'tickDurationMillis' : 2,\n"
+            + "    'priorityLevel' : 2,\n"
+            + "    'maxPendingChunkedMessage' : 2,\n"
+            + "    'autoAckOldestChunkedMessageOnQueueFull' : true,\n"
+            + "    'expireTimeOfIncompleteChunkedMessageMillis' : 2,\n"
+            + "    'cryptoFailureAction' : 'DISCARD',\n"
+            + "    'properties' : {\n"
+            + "      'new-prop' : 'new-prop-value'\n"
+            + "    },\n"
+            + "    'readCompacted' : true,\n"
+            + "    'subscriptionInitialPosition' : 'Earliest',\n"
+            + "    'patternAutoDiscoveryPeriod' : 2,\n"
+            + "    'regexSubscriptionMode' : 'AllTopics',\n"
+            + "    'deadLetterPolicy' : {\n"
+            + "      'retryLetterTopic' : 'new-retry',\n"
+            + "      'initialSubscriptionName' : 'new-dlq-sub',\n"
+            + "      'deadLetterTopic' : 'new-dlq',\n"
+            + "      'maxRedeliverCount' : 2\n"
+            + "    },\n"
+            + "    'retryEnable' : true,\n"
+            + "    'autoUpdatePartitions' : false,\n"
+            + "    'autoUpdatePartitionsIntervalSeconds' : 2,\n"
+            + "    'replicateSubscriptionState' : true,\n"
+            + "    'resetIncludeHead' : true,\n"
+            + "    'batchIndexAckEnabled' : true,\n"
+            + "    'ackReceiptEnabled' : true,\n"
+            + "    'poolMessages' : true,\n"
+            + "    'startPaused' : true\n"
+            + "  }").replace("'", "\"");
+
+        Map<String, Object> conf = new ObjectMapper().readValue(jsonConf, new 
TypeReference<HashMap<String,Object>>() {});
+
+        MessageListener<byte[]> messageListener = (consumer, message) -> {};
+        conf.put("messageListener", messageListener);
+        ConsumerEventListener consumerEventListener = 
mock(ConsumerEventListener.class);
+        conf.put("consumerEventListener", consumerEventListener);
+        RedeliveryBackoff negativeAckRedeliveryBackoff = 
MultiplierRedeliveryBackoff.builder().build();
+        conf.put("negativeAckRedeliveryBackoff", negativeAckRedeliveryBackoff);
+        RedeliveryBackoff ackTimeoutRedeliveryBackoff = 
MultiplierRedeliveryBackoff.builder().build();;
+        conf.put("ackTimeoutRedeliveryBackoff", ackTimeoutRedeliveryBackoff);
+        CryptoKeyReader cryptoKeyReader = 
DefaultCryptoKeyReader.builder().build();
+        conf.put("cryptoKeyReader", cryptoKeyReader);
+        MessageCrypto messageCrypto = new MessageCryptoBc("ctx2", true);
+        conf.put("messageCrypto", messageCrypto);
+        BatchReceivePolicy batchReceivePolicy = 
BatchReceivePolicy.builder().maxNumBytes(2).build();
+        conf.put("batchReceivePolicy", batchReceivePolicy);
+        KeySharedPolicy keySharedPolicy = KeySharedPolicy.stickyHashRange();
+        conf.put("keySharedPolicy", keySharedPolicy);
+        MessagePayloadProcessor payloadProcessor = 
mock(MessagePayloadProcessor.class);
+        conf.put("payloadProcessor", payloadProcessor);
+
+        consumerBuilder.loadConf(conf);
+
+        ConsumerConfigurationData<byte[]> configurationData = 
consumerBuilder.getConf();
+        assertEquals(configurationData.getTopicNames(), new 
HashSet<>(Collections.singletonList("new-topic")));
+        assertEquals(configurationData.getTopicsPattern().pattern(), 
"new-topics-pattern");
+        assertEquals(configurationData.getSubscriptionName(), 
"new-subscription");
+        assertEquals(configurationData.getSubscriptionType(), 
SubscriptionType.Key_Shared);
+        assertThat(configurationData.getSubscriptionProperties()).hasSize(1)
+            .hasFieldOrPropertyWithValue("new-sub-prop", "new-sub-prop-value");
+        assertEquals(configurationData.getSubscriptionMode(), 
SubscriptionMode.NonDurable);
+        assertEquals(configurationData.getReceiverQueueSize(), 2);
+        assertEquals(configurationData.getAcknowledgementsGroupTimeMicros(), 
2);
+
+        assertEquals(configurationData.getNegativeAckRedeliveryDelayMicros(), 
2);
+        
assertEquals(configurationData.getMaxTotalReceiverQueueSizeAcrossPartitions(), 
2);
+        assertEquals(configurationData.getConsumerName(), "new-consumer");
+        assertEquals(configurationData.getAckTimeoutMillis(), 2);
+        assertEquals(configurationData.getTickDurationMillis(), 2);
+        assertEquals(configurationData.getPriorityLevel(), 2);
+        assertEquals(configurationData.getMaxPendingChunkedMessage(), 2);
+        
assertTrue(configurationData.isAutoAckOldestChunkedMessageOnQueueFull());
+        
assertEquals(configurationData.getExpireTimeOfIncompleteChunkedMessageMillis(), 
2);
+        assertEquals(configurationData.getCryptoFailureAction(), 
ConsumerCryptoFailureAction.DISCARD);
+        assertThat(configurationData.getProperties()).hasSize(1)
+            .hasFieldOrPropertyWithValue("new-prop", "new-prop-value");
+        assertTrue(configurationData.isReadCompacted());
+        assertEquals(configurationData.getSubscriptionInitialPosition(), 
SubscriptionInitialPosition.Earliest);
+        assertEquals(configurationData.getPatternAutoDiscoveryPeriod(), 2);
+        assertEquals(configurationData.getRegexSubscriptionMode(), 
RegexSubscriptionMode.AllTopics);
+        
assertEquals(configurationData.getDeadLetterPolicy().getDeadLetterTopic(), 
"new-dlq");
+        
assertEquals(configurationData.getDeadLetterPolicy().getRetryLetterTopic(), 
"new-retry");
+        
assertEquals(configurationData.getDeadLetterPolicy().getInitialSubscriptionName(),
 "new-dlq-sub");
+        
assertEquals(configurationData.getDeadLetterPolicy().getMaxRedeliverCount(), 2);
+        assertTrue(configurationData.isRetryEnable());
+        assertFalse(configurationData.isAutoUpdatePartitions());
+        
assertEquals(configurationData.getAutoUpdatePartitionsIntervalSeconds(), 2);
+        assertTrue(configurationData.isReplicateSubscriptionState());
+        assertTrue(configurationData.isResetIncludeHead());
+        assertTrue(configurationData.isBatchIndexAckEnabled());
+        assertTrue(configurationData.isAckReceiptEnabled());
+        assertTrue(configurationData.isPoolMessages());
+        assertTrue(configurationData.isStartPaused());
+
+        assertNull(configurationData.getMessageListener());
+        assertNull(configurationData.getConsumerEventListener());
+        assertNull(configurationData.getNegativeAckRedeliveryBackoff());
+        assertNull(configurationData.getAckTimeoutRedeliveryBackoff());
+        assertNull(configurationData.getMessageListener());
+        assertNull(configurationData.getMessageCrypto());
+        assertNull(configurationData.getCryptoKeyReader());
+        assertNull(configurationData.getBatchReceivePolicy());
+        assertNull(configurationData.getKeySharedPolicy());
+        assertNull(configurationData.getPayloadProcessor());
+    }
+
+    @Test
+    public void testLoadConfNotModified() {
+        ConsumerBuilderImpl<byte[]> consumerBuilder = createConsumerBuilder();
+
+        consumerBuilder.loadConf(new HashMap<>());
+
+        ConsumerConfigurationData<byte[]> configurationData = 
consumerBuilder.getConf();
+        assertEquals(configurationData.getTopicNames(), new 
HashSet<>(Collections.singletonList("topic")));
+        assertEquals(configurationData.getTopicsPattern().pattern(), 
"topics-pattern");
+        assertEquals(configurationData.getSubscriptionName(), "subscription");
+        assertEquals(configurationData.getSubscriptionType(), 
SubscriptionType.Exclusive);
+        assertThat(configurationData.getSubscriptionProperties()).hasSize(1)
+            .hasFieldOrPropertyWithValue("sub-prop", "sub-prop-value");
+        assertEquals(configurationData.getSubscriptionMode(), 
SubscriptionMode.Durable);
+        assertEquals(configurationData.getReceiverQueueSize(), 1000);
+        assertEquals(configurationData.getAcknowledgementsGroupTimeMicros(), 
TimeUnit.MILLISECONDS.toMicros(100));
+        assertEquals(configurationData.getNegativeAckRedeliveryDelayMicros(), 
TimeUnit.MINUTES.toMicros(1));
+        
assertEquals(configurationData.getMaxTotalReceiverQueueSizeAcrossPartitions(), 
50000);
+        assertEquals(configurationData.getConsumerName(), "consumer");
+        assertEquals(configurationData.getAckTimeoutMillis(), 30000);
+        assertEquals(configurationData.getTickDurationMillis(), 1000);
+        assertEquals(configurationData.getPriorityLevel(), 0);
+        assertEquals(configurationData.getMaxPendingChunkedMessage(), 10);
+        
assertFalse(configurationData.isAutoAckOldestChunkedMessageOnQueueFull());
+        
assertEquals(configurationData.getExpireTimeOfIncompleteChunkedMessageMillis(), 
TimeUnit.MINUTES.toMillis(1));
+        assertEquals(configurationData.getCryptoFailureAction(), 
ConsumerCryptoFailureAction.FAIL);
+        assertThat(configurationData.getProperties()).hasSize(1)
+            .hasFieldOrPropertyWithValue("prop", "prop-value");
+        assertFalse(configurationData.isReadCompacted());
+        assertEquals(configurationData.getSubscriptionInitialPosition(), 
SubscriptionInitialPosition.Latest);
+        assertEquals(configurationData.getPatternAutoDiscoveryPeriod(), 60);
+        assertEquals(configurationData.getRegexSubscriptionMode(), 
RegexSubscriptionMode.PersistentOnly);
+        
assertEquals(configurationData.getDeadLetterPolicy().getDeadLetterTopic(), 
"dlq");
+        
assertEquals(configurationData.getDeadLetterPolicy().getRetryLetterTopic(), 
"retry");
+        
assertEquals(configurationData.getDeadLetterPolicy().getInitialSubscriptionName(),
 "dlq-sub");
+        
assertEquals(configurationData.getDeadLetterPolicy().getMaxRedeliverCount(), 1);
+        assertFalse(configurationData.isRetryEnable());
+        assertTrue(configurationData.isAutoUpdatePartitions());
+        
assertEquals(configurationData.getAutoUpdatePartitionsIntervalSeconds(), 60);
+        assertFalse(configurationData.isReplicateSubscriptionState());
+        assertFalse(configurationData.isResetIncludeHead());
+        assertFalse(configurationData.isBatchIndexAckEnabled());
+        assertFalse(configurationData.isAckReceiptEnabled());
+        assertFalse(configurationData.isPoolMessages());
+        assertFalse(configurationData.isStartPaused());
+
+        assertNull(configurationData.getMessageListener());
+        assertNull(configurationData.getConsumerEventListener());
+        assertNull(configurationData.getNegativeAckRedeliveryBackoff());
+        assertNull(configurationData.getAckTimeoutRedeliveryBackoff());
+        assertNull(configurationData.getMessageListener());
+        assertNull(configurationData.getMessageCrypto());
+        assertNull(configurationData.getCryptoKeyReader());
+        assertNull(configurationData.getBatchReceivePolicy());
+        assertNull(configurationData.getKeySharedPolicy());
+        assertNull(configurationData.getPayloadProcessor());
+    }
+
+    private ConsumerBuilderImpl<byte[]> createConsumerBuilder() {
+        ConsumerBuilderImpl<byte[]> consumerBuilder = new 
ConsumerBuilderImpl<>(null, Schema.BYTES);
+        Map<String, String> properties = new HashMap<>();
+        properties.put("prop", "prop-value");
+        Map<String, String> subscriptionProperties = new HashMap<>();
+        subscriptionProperties.put("sub-prop", "sub-prop-value");
+        consumerBuilder
+            .topic("topic")
+            .topicsPattern("topics-pattern")
+            .subscriptionName("subscription")
+            .subscriptionProperties(subscriptionProperties)
+            .messageListener((consumer, message) -> {})
+            .consumerEventListener(mock(ConsumerEventListener.class))
+            
.negativeAckRedeliveryBackoff(MultiplierRedeliveryBackoff.builder().build())
+            
.ackTimeoutRedeliveryBackoff(MultiplierRedeliveryBackoff.builder().build())
+            .consumerName("consumer")
+            .cryptoKeyReader(DefaultCryptoKeyReader.builder().build())
+            .messageCrypto(new MessageCryptoBc("ctx1", true))
+            .properties(properties)
+            
.deadLetterPolicy(DeadLetterPolicy.builder().deadLetterTopic("dlq").retryLetterTopic("retry").initialSubscriptionName("dlq-sub").maxRedeliverCount(1).build())
+            
.batchReceivePolicy(BatchReceivePolicy.builder().maxNumBytes(1).build())
+            .keySharedPolicy(KeySharedPolicy.autoSplitHashRange())
+            .messagePayloadProcessor(mock(MessagePayloadProcessor.class));
+        return consumerBuilder;
+    }
 }

Reply via email to