This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.5 by this push:
     new deafc24  client: make SubscriptionMode a member of 
ConsumerConfigurationData (#6337)
deafc24 is described below

commit deafc24b205122767ad9ad86e48c591779547e31
Author: Jia Zhai <[email protected]>
AuthorDate: Tue Feb 18 10:32:50 2020 +0800

    client: make SubscriptionMode a member of ConsumerConfigurationData (#6337)
    
    Currently, SubscriptionMode is a parameter to create ConsumerImpl, but it 
is not exported out, and user could not set this value for consumer.  This 
change tries to make SubscriptionMode a member of ConsumerConfigurationData, so 
user could set this parameter when create consumer.
    (cherry picked from commit 208af7cd7e718d378707df4aba600be3202e23df)
---
 .../apache/pulsar/client/impl/RawReaderImpl.java   |  1 -
 .../apache/pulsar/client/api/ConsumerBuilder.java  | 15 ++++++++++
 .../apache/pulsar/client/api/SubscriptionMode.java | 31 +++++++++++++++++++++
 .../pulsar/client/impl/ConsumerBuilderImpl.java    |  8 ++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 32 +++++++++++-----------
 .../client/impl/MultiTopicsConsumerImpl.java       | 13 ++++-----
 .../pulsar/client/impl/PulsarClientImpl.java       |  3 +-
 .../org/apache/pulsar/client/impl/ReaderImpl.java  |  4 +--
 .../pulsar/client/impl/ZeroQueueConsumerImpl.java  |  4 +--
 .../impl/conf/ConsumerConfigurationData.java       |  3 ++
 .../pulsar/client/impl/ConsumerImplTest.java       | 27 +++++++++++-------
 11 files changed, 101 insertions(+), 40 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index a6db7c6..44dd93f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -110,7 +110,6 @@ public class RawReaderImpl implements RawReader {
                 TopicName.getPartitionIndex(conf.getSingleTopic()),
                 false,
                 consumerFuture,
-                SubscriptionMode.Durable,
                 MessageId.earliest,
                 0 /* startMessageRollbackDurationInSec */,
                 Schema.BYTES, null,
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index f63e95f..0c003de 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -229,6 +229,21 @@ public interface ConsumerBuilder<T> extends Cloneable {
     ConsumerBuilder<T> subscriptionType(SubscriptionType subscriptionType);
 
     /**
+     * Select the subscription mode to be used when subscribing to the topic.
+     *
+     * <p>Options are:
+     * <ul>
+     *  <li>{@link SubscriptionMode#Durable} (Default)</li>
+     *  <li>{@link SubscriptionMode#NonDurable}</li>
+     * </ul>
+     *
+     * @param subscriptionMode
+     *            the subscription mode value
+     * @return the consumer builder instance
+     */
+    ConsumerBuilder<T> subscriptionMode(SubscriptionMode subscriptionMode);
+
+    /**
      * Sets a {@link MessageListener} for the consumer
      *
      * <p>When a {@link MessageListener} is set, application will receive 
messages through it. Calls to
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SubscriptionMode.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SubscriptionMode.java
new file mode 100644
index 0000000..7e11bbb
--- /dev/null
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SubscriptionMode.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+/**
+ * Types of subscription mode supported by Pulsar.
+ */
+public enum SubscriptionMode {
+    // Make the subscription to be backed by a durable cursor that will retain 
messages and persist the current
+    // position
+    Durable,
+
+    // Lightweight subscription mode that doesn't have a durable cursor 
associated
+    NonDurable
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index f7848b6..f265f38 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -47,6 +47,7 @@ import org.apache.pulsar.client.api.RegexSubscriptionMode;
 import 
org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
@@ -192,6 +193,13 @@ public class ConsumerBuilderImpl<T> implements 
ConsumerBuilder<T> {
     }
 
     @Override
+    public ConsumerBuilder<T> subscriptionMode(@NonNull SubscriptionMode 
subscriptionMode) {
+        conf.setSubscriptionMode(subscriptionMode);
+        return this;
+    }
+
+
+    @Override
     public ConsumerBuilder<T> messageListener(@NonNull MessageListener<T> 
messageListener) {
         conf.setMessageListener(messageListener);
         return this;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index c7211d2..fce4cc0 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -63,6 +63,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
 import 
org.apache.pulsar.client.api.PulsarClientException.TopicDoesNotExistException;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
@@ -150,39 +151,38 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
     private final boolean createTopicIfDoesNotExist;
 
-    enum SubscriptionMode {
-        // Make the subscription to be backed by a durable cursor that will 
retain messages and persist the current
-        // position
-        Durable,
 
-        // Lightweight subscription mode that doesn't have a durable cursor 
associated
-        NonDurable
-    }
-
-    static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client, String 
topic, ConsumerConfigurationData<T> conf,
-            ExecutorService listenerExecutor, int partitionIndex, boolean 
hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
-            SubscriptionMode subscriptionMode, MessageId startMessageId, 
Schema<T> schema, ConsumerInterceptors<T> interceptors,
-            boolean createTopicIfDoesNotExist) {
+    static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
+                                               String topic,
+                                               ConsumerConfigurationData<T> 
conf,
+                                               ExecutorService 
listenerExecutor,
+                                               int partitionIndex,
+                                               boolean hasParentConsumer,
+                                               CompletableFuture<Consumer<T>> 
subscribeFuture,
+                                               MessageId startMessageId,
+                                               Schema<T> schema,
+                                               ConsumerInterceptors<T> 
interceptors,
+                                               boolean 
createTopicIfDoesNotExist) {
         if (conf.getReceiverQueueSize() == 0) {
             return new ZeroQueueConsumerImpl<>(client, topic, conf, 
listenerExecutor, partitionIndex, hasParentConsumer,
                     subscribeFuture,
-                    subscriptionMode, startMessageId, schema, interceptors,
+                    startMessageId, schema, interceptors,
                     createTopicIfDoesNotExist);
         } else {
             return new ConsumerImpl<>(client, topic, conf, listenerExecutor, 
partitionIndex, hasParentConsumer,
-                    subscribeFuture, subscriptionMode, startMessageId, 0 /* 
rollback time in sec to start msgId */,
+                    subscribeFuture, startMessageId, 0 /* rollback time in sec 
to start msgId */,
                     schema, interceptors, createTopicIfDoesNotExist);
         }
     }
 
     protected ConsumerImpl(PulsarClientImpl client, String topic, 
ConsumerConfigurationData<T> conf,
             ExecutorService listenerExecutor, int partitionIndex, boolean 
hasParentConsumer,
-            CompletableFuture<Consumer<T>> subscribeFuture, SubscriptionMode 
subscriptionMode, MessageId startMessageId,
+            CompletableFuture<Consumer<T>> subscribeFuture, MessageId 
startMessageId,
             long startMessageRollbackDurationInSec, Schema<T> schema, 
ConsumerInterceptors<T> interceptors,
             boolean createTopicIfDoesNotExist) {
         super(client, topic, conf, conf.getReceiverQueueSize(), 
listenerExecutor, subscribeFuture, schema, interceptors);
         this.consumerId = client.newConsumerId();
-        this.subscriptionMode = subscriptionMode;
+        this.subscriptionMode = conf.getSubscriptionMode();
         this.startMessageId = startMessageId != null ? new 
BatchMessageIdImpl((MessageIdImpl) startMessageId) : null;
         this.lastDequeuedMessage = startMessageId == null ? MessageId.earliest 
: startMessageId;
         this.initialStartMessageId = this.startMessageId;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 9c95d17..3e9e69a 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -59,7 +59,6 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import 
org.apache.pulsar.client.api.PulsarClientException.NotSupportedException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.client.util.ConsumerName;
@@ -833,10 +832,10 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                         String partitionName = 
TopicName.get(topicName).getPartition(partitionIndex).toString();
                         CompletableFuture<Consumer<T>> subFuture = new 
CompletableFuture<>();
                         ConsumerImpl<T> newConsumer = 
ConsumerImpl.newConsumerImpl(client, partitionName,
-                                configurationData, 
client.externalExecutorProvider().getExecutor(),
-                                partitionIndex, true, subFuture,
-                                SubscriptionMode.Durable, null, schema, 
interceptors,
-                                createIfDoesNotExist);
+                            configurationData, 
client.externalExecutorProvider().getExecutor(),
+                            partitionIndex, true, subFuture,
+                            null, schema, interceptors,
+                            createIfDoesNotExist);
                         consumers.putIfAbsent(newConsumer.getTopic(), 
newConsumer);
                         return subFuture;
                     })
@@ -847,7 +846,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
 
             CompletableFuture<Consumer<T>> subFuture = new 
CompletableFuture<>();
             ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, 
topicName, internalConfig,
-                    client.externalExecutorProvider().getExecutor(), -1, true, 
subFuture, SubscriptionMode.Durable, null,
+                    client.externalExecutorProvider().getExecutor(), -1, true, 
subFuture, null,
                     schema, interceptors,
                     createIfDoesNotExist);
             consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
@@ -1118,7 +1117,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                         ConsumerImpl<T> newConsumer = 
ConsumerImpl.newConsumerImpl(
                             client, partitionName, configurationData,
                             client.externalExecutorProvider().getExecutor(),
-                            partitionIndex, true, subFuture, 
SubscriptionMode.Durable, null, schema, interceptors,
+                            partitionIndex, true, subFuture, null, schema, 
interceptors,
                             true /* createTopicIfDoesNotExist */);
                         consumers.putIfAbsent(newConsumer.getTopic(), 
newConsumer);
                         if (log.isDebugEnabled()) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 5a2a6da..f51fb6b 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -64,7 +64,6 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
 import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.transaction.TransactionBuilder;
-import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
@@ -354,7 +353,7 @@ public class PulsarClientImpl implements PulsarClient {
             } else {
                 int partitionIndex = TopicName.getPartitionIndex(topic);
                 consumer = ConsumerImpl.newConsumerImpl(PulsarClientImpl.this, 
topic, conf, listenerThread, partitionIndex, false,
-                        consumerSubscribedFuture, SubscriptionMode.Durable, 
null, schema, interceptors,
+                        consumerSubscribedFuture,null, schema, interceptors,
                         true /* createTopicIfDoesNotExist */);
             }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index a3ff33c..5799877 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -26,7 +26,6 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.*;
-import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
 import org.apache.pulsar.common.naming.TopicName;
@@ -47,6 +46,7 @@ public class ReaderImpl<T> implements Reader<T> {
         
consumerConfiguration.getTopicNames().add(readerConfiguration.getTopicName());
         consumerConfiguration.setSubscriptionName(subscription);
         consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
+        consumerConfiguration.setSubscriptionMode(SubscriptionMode.NonDurable);
         
consumerConfiguration.setReceiverQueueSize(readerConfiguration.getReceiverQueueSize());
         
consumerConfiguration.setReadCompacted(readerConfiguration.isReadCompacted());
 
@@ -83,7 +83,7 @@ public class ReaderImpl<T> implements Reader<T> {
 
         final int partitionIdx = 
TopicName.getPartitionIndex(readerConfiguration.getTopicName());
         consumer = new ConsumerImpl<>(client, 
readerConfiguration.getTopicName(), consumerConfiguration,
-                listenerExecutor, partitionIdx, false, consumerFuture, 
SubscriptionMode.NonDurable,
+                listenerExecutor, partitionIdx, false, consumerFuture,
                 readerConfiguration.getStartMessageId(), 
readerConfiguration.getStartMessageFromRollbackDurationInSec(),
                 schema, null, true /* createTopicIfDoesNotExist */);
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
index c3c2f76..39c8687 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
@@ -48,11 +48,11 @@ public class ZeroQueueConsumerImpl<T> extends 
ConsumerImpl<T> {
 
     public ZeroQueueConsumerImpl(PulsarClientImpl client, String topic, 
ConsumerConfigurationData<T> conf,
             ExecutorService listenerExecutor, int partitionIndex, boolean 
hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
-            SubscriptionMode subscriptionMode, MessageId startMessageId, 
Schema<T> schema,
+            MessageId startMessageId, Schema<T> schema,
             ConsumerInterceptors<T> interceptors,
             boolean createTopicIfDoesNotExist) {
         super(client, topic, conf, listenerExecutor, partitionIndex, 
hasParentConsumer, subscribeFuture,
-                subscriptionMode, startMessageId, 0 /* 
startMessageRollbackDurationInSec */, schema, interceptors,
+                startMessageId, 0 /* startMessageRollbackDurationInSec */, 
schema, interceptors,
                 createTopicIfDoesNotExist);
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index 823ca9e..693fde0 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -43,6 +43,7 @@ import org.apache.pulsar.client.api.KeySharedPolicy;
 import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.RegexSubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
 
 @Data
@@ -59,6 +60,8 @@ public class ConsumerConfigurationData<T> implements 
Serializable, Cloneable {
 
     private SubscriptionType subscriptionType = SubscriptionType.Exclusive;
 
+    private SubscriptionMode subscriptionMode = SubscriptionMode.Durable;
+
     @JsonIgnore
     private MessageListener<T> messageListener;
 
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
index a757651..dcb2e36 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -18,25 +18,32 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 import io.netty.util.Timer;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import static org.mockito.Mockito.*;
-
 public class ConsumerImplTest {
 
 
@@ -62,7 +69,7 @@ public class ConsumerImplTest {
 
         consumerConf.setSubscriptionName("test-sub");
         consumer = ConsumerImpl.newConsumerImpl(client, topic, consumerConf,
-                executorService, -1, false, subscribeFuture, 
SubscriptionMode.Durable, null, null, null,
+                executorService, -1, false, subscribeFuture, null, null, null,
                 true);
     }
 

Reply via email to