This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3fce3097c76 [improve][client] Make replicateSubscriptionState nullable 
(#23757)
3fce3097c76 is described below

commit 3fce3097c76a9c8cb64cf3d8d87f6e050e6cb3a5
Author: Zixuan Liu <[email protected]>
AuthorDate: Fri Dec 20 20:58:03 2024 +0800

    [improve][client] Make replicateSubscriptionState nullable (#23757)
    
    Signed-off-by: Zixuan Liu <[email protected]>
---
 .../apache/pulsar/broker/service/ServerCnx.java    |  4 +-
 .../pulsar/broker/service/SubscriptionOption.java  |  2 +-
 .../service/nonpersistent/NonPersistentTopic.java  |  4 +-
 .../service/persistent/PersistentSubscription.java | 19 +++--
 .../broker/service/persistent/PersistentTopic.java | 19 ++---
 .../client/api/ReplicateSubscriptionTest.java      | 96 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  2 +-
 .../impl/conf/ConsumerConfigurationData.java       | 14 +++-
 .../client/impl/ConsumerBuilderImplTest.java       | 36 +++++++-
 .../apache/pulsar/common/protocol/Commands.java    |  8 +-
 10 files changed, 176 insertions(+), 28 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index f9e593345d8..2415930a99a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1241,8 +1241,8 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                 ? subscribe.getStartMessageRollbackDurationSec()
                 : -1;
         final SchemaData schema = subscribe.hasSchema() ? 
getSchema(subscribe.getSchema()) : null;
-        final boolean isReplicated = subscribe.hasReplicateSubscriptionState()
-                && subscribe.isReplicateSubscriptionState();
+        final Boolean isReplicated =
+                subscribe.hasReplicateSubscriptionState() ? 
subscribe.isReplicateSubscriptionState() : null;
         final boolean forceTopicCreation = subscribe.isForceTopicCreation();
         final KeySharedMeta keySharedMeta = subscribe.hasKeySharedMeta()
               ? new KeySharedMeta().copyFrom(subscribe.getKeySharedMeta())
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java
index af56d023616..328e7618f8c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java
@@ -46,7 +46,7 @@ public class SubscriptionOption {
     private boolean readCompacted;
     private CommandSubscribe.InitialPosition initialPosition;
     private long startMessageRollbackDurationSec;
-    private boolean replicatedSubscriptionStateArg;
+    private Boolean replicatedSubscriptionStateArg;
     private KeySharedMeta keySharedMeta;
     private Optional<Map<String, String>> subscriptionProperties;
     private long consumerEpoch;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 34c2678f847..7cdc8cc11a4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -256,7 +256,7 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
         return internalSubscribe(option.getCnx(), 
option.getSubscriptionName(), option.getConsumerId(),
                 option.getSubType(), option.getPriorityLevel(), 
option.getConsumerName(),
                 option.getStartMessageId(), option.getMetadata(), 
option.isReadCompacted(),
-                option.getStartMessageRollbackDurationSec(), 
option.isReplicatedSubscriptionStateArg(),
+                option.getStartMessageRollbackDurationSec(), 
option.getReplicatedSubscriptionStateArg(),
                 option.getKeySharedMeta(), 
option.getSubscriptionProperties().orElse(null),
                 option.getSchemaType());
     }
@@ -279,7 +279,7 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
                                                           String consumerName, 
MessageId startMessageId,
                                                           Map<String, String> 
metadata, boolean readCompacted,
                                                           long 
resetStartMessageBackInSec,
-                                                          boolean 
replicateSubscriptionState,
+                                                          Boolean 
replicateSubscriptionState,
                                                           KeySharedMeta 
keySharedMeta,
                                                           Map<String, String> 
subscriptionProperties,
                                                           SchemaType 
schemaType) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 0096f398ada..8cebbd52695 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -133,9 +133,11 @@ public class PersistentSubscription extends 
AbstractSubscription {
     private volatile Map<String, String> subscriptionProperties;
     private volatile CompletableFuture<Void> fenceFuture;
     private volatile CompletableFuture<Void> inProgressResetCursorFuture;
+    private volatile Boolean replicatedControlled;
 
-    static Map<String, Long> getBaseCursorProperties(boolean isReplicated) {
-        return isReplicated ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES : 
NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES;
+    static Map<String, Long> getBaseCursorProperties(Boolean isReplicated) {
+        return isReplicated != null && isReplicated ? 
REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES :
+                NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES;
     }
 
     static boolean isCursorFromReplicatedSubscription(ManagedCursor cursor) {
@@ -143,19 +145,21 @@ public class PersistentSubscription extends 
AbstractSubscription {
     }
 
     public PersistentSubscription(PersistentTopic topic, String 
subscriptionName, ManagedCursor cursor,
-                                  boolean replicated) {
+                                  Boolean replicated) {
         this(topic, subscriptionName, cursor, replicated, 
Collections.emptyMap());
     }
 
     public PersistentSubscription(PersistentTopic topic, String 
subscriptionName, ManagedCursor cursor,
-                                  boolean replicated, Map<String, String> 
subscriptionProperties) {
+                                  Boolean replicated, Map<String, String> 
subscriptionProperties) {
         this.topic = topic;
         this.cursor = cursor;
         this.topicName = topic.getName();
         this.subName = subscriptionName;
         this.fullName = MoreObjects.toStringHelper(this).add("topic", 
topicName).add("name", subName).toString();
         this.expiryMonitor = new PersistentMessageExpiryMonitor(topic, 
subscriptionName, cursor, this);
-        this.setReplicated(replicated);
+        if (replicated != null) {
+            this.setReplicated(replicated);
+        }
         this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties)
                 ? Collections.emptyMap() : 
Collections.unmodifiableMap(subscriptionProperties);
         if 
(topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()
@@ -194,6 +198,7 @@ public class PersistentSubscription extends 
AbstractSubscription {
     }
 
     public boolean setReplicated(boolean replicated) {
+        replicatedControlled = replicated;
         ServiceConfiguration config = 
topic.getBrokerService().getPulsar().getConfig();
 
         if (!replicated || !config.isEnableReplicatedSubscriptions()) {
@@ -1557,4 +1562,8 @@ public class PersistentSubscription extends 
AbstractSubscription {
 
     private static final Logger log = 
LoggerFactory.getLogger(PersistentSubscription.class);
 
+    @VisibleForTesting
+    public Boolean getReplicatedControlled() {
+        return replicatedControlled;
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 056fad2a005..11220d1c955 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -513,7 +513,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                 } else {
                     final String subscriptionName = 
Codec.decode(cursor.getName());
                     subscriptions.put(subscriptionName, 
createPersistentSubscription(subscriptionName, cursor,
-                            
PersistentSubscription.isCursorFromReplicatedSubscription(cursor),
+                            
PersistentSubscription.isCursorFromReplicatedSubscription(cursor) ? true : null,
                             cursor.getCursorProperties()));
                     // subscription-cursor gets activated by default: 
deactivate as there is no active subscription
                     // right now
@@ -584,7 +584,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     }
 
     private PersistentSubscription createPersistentSubscription(String 
subscriptionName, ManagedCursor cursor,
-            boolean replicated, Map<String, String> subscriptionProperties) {
+            Boolean replicated, Map<String, String> subscriptionProperties) {
         requireNonNull(topicCompactionService);
         if (isCompactionSubscription(subscriptionName)
                 && topicCompactionService instanceof 
PulsarTopicCompactionService pulsarTopicCompactionService) {
@@ -888,7 +888,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                 option.getSubType(), option.getPriorityLevel(), 
option.getConsumerName(), option.isDurable(),
                 option.getStartMessageId(), option.getMetadata(), 
option.isReadCompacted(),
                 option.getInitialPosition(), 
option.getStartMessageRollbackDurationSec(),
-                option.isReplicatedSubscriptionStateArg(), 
option.getKeySharedMeta(),
+                option.getReplicatedSubscriptionStateArg(), 
option.getKeySharedMeta(),
                 
option.getSubscriptionProperties().orElse(Collections.emptyMap()),
                 option.getConsumerEpoch(), option.getSchemaType());
     }
@@ -900,7 +900,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                                                           Map<String, String> 
metadata, boolean readCompacted,
                                                           InitialPosition 
initialPosition,
                                                           long 
startMessageRollbackDurationSec,
-                                                          boolean 
replicatedSubscriptionStateArg,
+                                                          Boolean 
replicatedSubscriptionStateArg,
                                                           KeySharedMeta 
keySharedMeta,
                                                           Map<String, String> 
subscriptionProperties,
                                                           long consumerEpoch,
@@ -911,12 +911,9 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         }
 
         return brokerService.checkTopicNsOwnership(getName()).thenCompose(__ 
-> {
-            boolean replicatedSubscriptionState = 
replicatedSubscriptionStateArg;
-
-            if (replicatedSubscriptionState
+            if (replicatedSubscriptionStateArg != null && 
replicatedSubscriptionStateArg
                     && 
!brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions()) {
                 log.warn("[{}] Replicated Subscription is disabled by 
broker.", getName());
-                replicatedSubscriptionState = false;
             }
 
             if (subType == SubType.Key_Shared
@@ -985,7 +982,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 
             CompletableFuture<? extends Subscription> subscriptionFuture = 
isDurable
                     ? getDurableSubscription(subscriptionName, 
initialPosition, startMessageRollbackDurationSec,
-                            replicatedSubscriptionState, 
subscriptionProperties)
+                    replicatedSubscriptionStateArg, subscriptionProperties)
                     : getNonDurableSubscription(subscriptionName, 
startMessageId, initialPosition,
                     startMessageRollbackDurationSec, readCompacted, 
subscriptionProperties);
 
@@ -1082,7 +1079,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     private CompletableFuture<Subscription> getDurableSubscription(String 
subscriptionName,
                                                                    
InitialPosition initialPosition,
                                                                    long 
startMessageRollbackDurationSec,
-                                                                   boolean 
replicated,
+                                                                   Boolean 
replicated,
                                                                    Map<String, 
String> subscriptionProperties) {
         CompletableFuture<Subscription> subscriptionFuture = new 
CompletableFuture<>();
         if (checkMaxSubscriptionsPerTopicExceed(subscriptionName)) {
@@ -1113,7 +1110,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                         return;
                     }
                 }
-                if (replicated && !subscription.isReplicated()) {
+                if (replicated != null && replicated && 
!subscription.isReplicated()) {
                     // Flip the subscription state
                     subscription.setReplicated(replicated);
                 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ReplicateSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ReplicateSubscriptionTest.java
new file mode 100644
index 00000000000..327081bf1b9
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ReplicateSubscriptionTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class ReplicateSubscriptionTest extends ProducerConsumerBase {
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+    }
+
+    @DataProvider
+    public Object[] replicateSubscriptionState() {
+        return new Object[]{
+                Boolean.TRUE,
+                Boolean.FALSE,
+                null
+        };
+    }
+
+    @Test(dataProvider = "replicateSubscriptionState")
+    public void testReplicateSubscriptionState(Boolean 
replicateSubscriptionState)
+            throws Exception {
+        String topic = "persistent://my-property/my-ns/" + System.nanoTime();
+        String subName = "sub-" + System.nanoTime();
+        ConsumerBuilder<String> consumerBuilder = 
pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName(subName);
+        if (replicateSubscriptionState != null) {
+            
consumerBuilder.replicateSubscriptionState(replicateSubscriptionState);
+        }
+        ConsumerBuilderImpl consumerBuilderImpl = (ConsumerBuilderImpl) 
consumerBuilder;
+        
assertEquals(consumerBuilderImpl.getConf().getReplicateSubscriptionState(), 
replicateSubscriptionState);
+        @Cleanup
+        Consumer<String> ignored = consumerBuilder.subscribe();
+        CompletableFuture<Optional<Topic>> topicIfExists = 
pulsar.getBrokerService().getTopicIfExists(topic);
+        assertThat(topicIfExists)
+                .succeedsWithin(3, TimeUnit.SECONDS)
+                .matches(optionalTopic -> {
+                    assertTrue(optionalTopic.isPresent());
+                    Topic topicRef = optionalTopic.get();
+                    Subscription subscription = 
topicRef.getSubscription(subName);
+                    assertNotNull(subscription);
+                    assertTrue(subscription instanceof PersistentSubscription);
+                    PersistentSubscription persistentSubscription = 
(PersistentSubscription) subscription;
+                    
assertEquals(persistentSubscription.getReplicatedControlled(), 
replicateSubscriptionState);
+                    return true;
+                });
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 4d1b51e34db..16dc70f736e 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -902,7 +902,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         synchronized (this) {
             ByteBuf request = Commands.newSubscribe(topic, subscription, 
consumerId, requestId, getSubType(),
                     priorityLevel, consumerName, isDurable, 
startMessageIdData, metadata, readCompacted,
-                    conf.isReplicateSubscriptionState(),
+                    conf.getReplicateSubscriptionState(),
                     
InitialPosition.valueOf(subscriptionInitialPosition.getValue()),
                     startMessageRollbackDuration, si, 
createTopicIfDoesNotExist, conf.getKeySharedPolicy(),
                     // Use the current epoch to subscribe.
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index f9ff5913f62..6e884ba2791 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.impl.conf;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.Sets;
 import io.swagger.annotations.ApiModelProperty;
 import java.io.Serializable;
@@ -381,7 +382,8 @@ public class ConsumerConfigurationData<T> implements 
Serializable, Cloneable {
             value = "If `replicateSubscriptionState` is enabled, a 
subscription state is replicated to geo-replicated"
                     + " clusters."
     )
-    private boolean replicateSubscriptionState = false;
+    @JsonProperty(access = JsonProperty.Access.READ_WRITE)
+    private Boolean replicateSubscriptionState;
 
     private boolean resetIncludeHead = false;
 
@@ -437,4 +439,14 @@ public class ConsumerConfigurationData<T> implements 
Serializable, Cloneable {
             throw new RuntimeException("Failed to clone 
ConsumerConfigurationData");
         }
     }
+
+    /**
+     * Backward compatibility with the old `replicateSubscriptionState` field.
+     * @deprecated Using {@link #getReplicateSubscriptionState()} instead.
+     */
+    @JsonIgnore
+    @Deprecated
+    public boolean isReplicateSubscriptionState() {
+        return replicateSubscriptionState != null && 
replicateSubscriptionState;
+    }
 }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
index e4b7b4d1ec8..c103712d400 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
@@ -504,7 +504,7 @@ public class ConsumerBuilderImplTest {
         assertTrue(configurationData.isRetryEnable());
         assertFalse(configurationData.isAutoUpdatePartitions());
         
assertEquals(configurationData.getAutoUpdatePartitionsIntervalSeconds(), 2);
-        assertTrue(configurationData.isReplicateSubscriptionState());
+        assertEquals(configurationData.getReplicateSubscriptionState(), 
Boolean.TRUE);
         assertTrue(configurationData.isResetIncludeHead());
         assertTrue(configurationData.isBatchIndexAckEnabled());
         assertTrue(configurationData.isAckReceiptEnabled());
@@ -564,7 +564,7 @@ public class ConsumerBuilderImplTest {
         assertFalse(configurationData.isRetryEnable());
         assertTrue(configurationData.isAutoUpdatePartitions());
         
assertEquals(configurationData.getAutoUpdatePartitionsIntervalSeconds(), 60);
-        assertFalse(configurationData.isReplicateSubscriptionState());
+        assertNull(configurationData.getReplicateSubscriptionState());
         assertFalse(configurationData.isResetIncludeHead());
         assertFalse(configurationData.isBatchIndexAckEnabled());
         assertFalse(configurationData.isAckReceiptEnabled());
@@ -584,6 +584,38 @@ public class ConsumerBuilderImplTest {
         assertNull(configurationData.getPayloadProcessor());
     }
 
+    @Test
+    public void testReplicateSubscriptionState() {
+        ConsumerBuilderImpl<byte[]> consumerBuilder = createConsumerBuilder();
+        assertNull(consumerBuilder.getConf().getReplicateSubscriptionState());
+
+        consumerBuilder.replicateSubscriptionState(true);
+        
assertEquals(consumerBuilder.getConf().getReplicateSubscriptionState(), 
Boolean.TRUE);
+
+        consumerBuilder.replicateSubscriptionState(false);
+        
assertEquals(consumerBuilder.getConf().getReplicateSubscriptionState(), 
Boolean.FALSE);
+
+        Map<String, Object> conf = new HashMap<>();
+        consumerBuilder = createConsumerBuilder();
+        consumerBuilder.loadConf(conf);
+        assertNull(consumerBuilder.getConf().getReplicateSubscriptionState());
+
+        conf.put("replicateSubscriptionState", true);
+        consumerBuilder = createConsumerBuilder();
+        consumerBuilder.loadConf(conf);
+        
assertEquals(consumerBuilder.getConf().getReplicateSubscriptionState(), 
Boolean.TRUE);
+
+        conf.put("replicateSubscriptionState", false);
+        consumerBuilder = createConsumerBuilder();
+        consumerBuilder.loadConf(conf);
+        
assertEquals(consumerBuilder.getConf().getReplicateSubscriptionState(), 
Boolean.FALSE);
+
+        conf.put("replicateSubscriptionState", null);
+        consumerBuilder = createConsumerBuilder();
+        consumerBuilder.loadConf(conf);
+        assertNull(consumerBuilder.getConf().getReplicateSubscriptionState());
+    }
+
     private ConsumerBuilderImpl<byte[]> createConsumerBuilder() {
         ConsumerBuilderImpl<byte[]> consumerBuilder = new 
ConsumerBuilderImpl<>(null, Schema.BYTES);
         Map<String, String> properties = new HashMap<>();
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 19aa9907549..4f390cc99e6 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -583,7 +583,7 @@ public class Commands {
 
     public static ByteBuf newSubscribe(String topic, String subscription, long 
consumerId, long requestId,
             SubType subType, int priorityLevel, String consumerName, boolean 
isDurable, MessageIdData startMessageId,
-            Map<String, String> metadata, boolean readCompacted, boolean 
isReplicated,
+            Map<String, String> metadata, boolean readCompacted, Boolean 
isReplicated,
             InitialPosition subscriptionInitialPosition, long 
startMessageRollbackDurationInSec, SchemaInfo schemaInfo,
             boolean createTopicIfDoesNotExist) {
         return newSubscribe(topic, subscription, consumerId, requestId, 
subType, priorityLevel, consumerName,
@@ -594,7 +594,7 @@ public class Commands {
 
     public static ByteBuf newSubscribe(String topic, String subscription, long 
consumerId, long requestId,
                SubType subType, int priorityLevel, String consumerName, 
boolean isDurable, MessageIdData startMessageId,
-               Map<String, String> metadata, boolean readCompacted, boolean 
isReplicated,
+               Map<String, String> metadata, boolean readCompacted, Boolean 
isReplicated,
                InitialPosition subscriptionInitialPosition, long 
startMessageRollbackDurationInSec,
                SchemaInfo schemaInfo, boolean createTopicIfDoesNotExist, 
KeySharedPolicy keySharedPolicy,
                Map<String, String> subscriptionProperties, long consumerEpoch) 
{
@@ -610,9 +610,11 @@ public class Commands {
                 .setDurable(isDurable)
                 .setReadCompacted(readCompacted)
                 .setInitialPosition(subscriptionInitialPosition)
-                .setReplicateSubscriptionState(isReplicated)
                 .setForceTopicCreation(createTopicIfDoesNotExist)
                 .setConsumerEpoch(consumerEpoch);
+        if (isReplicated != null) {
+            subscribe.setReplicateSubscriptionState(isReplicated);
+        }
 
         if (subscriptionProperties != null && 
!subscriptionProperties.isEmpty()) {
             List<KeyValue> keyValues = new ArrayList<>();

Reply via email to