This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.5 by this push:
new deafc24 client: make SubscriptionMode a member of
ConsumerConfigurationData (#6337)
deafc24 is described below
commit deafc24b205122767ad9ad86e48c591779547e31
Author: Jia Zhai <[email protected]>
AuthorDate: Tue Feb 18 10:32:50 2020 +0800
client: make SubscriptionMode a member of ConsumerConfigurationData (#6337)
Currently, SubscriptionMode is a parameter to create ConsumerImpl, but it
is not exported out, and user could not set this value for consumer. This
change tries to make SubscriptionMode a member of ConsumerConfigurationData, so
user could set this parameter when create consumer.
(cherry picked from commit 208af7cd7e718d378707df4aba600be3202e23df)
---
.../apache/pulsar/client/impl/RawReaderImpl.java | 1 -
.../apache/pulsar/client/api/ConsumerBuilder.java | 15 ++++++++++
.../apache/pulsar/client/api/SubscriptionMode.java | 31 +++++++++++++++++++++
.../pulsar/client/impl/ConsumerBuilderImpl.java | 8 ++++++
.../apache/pulsar/client/impl/ConsumerImpl.java | 32 +++++++++++-----------
.../client/impl/MultiTopicsConsumerImpl.java | 13 ++++-----
.../pulsar/client/impl/PulsarClientImpl.java | 3 +-
.../org/apache/pulsar/client/impl/ReaderImpl.java | 4 +--
.../pulsar/client/impl/ZeroQueueConsumerImpl.java | 4 +--
.../impl/conf/ConsumerConfigurationData.java | 3 ++
.../pulsar/client/impl/ConsumerImplTest.java | 27 +++++++++++-------
11 files changed, 101 insertions(+), 40 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index a6db7c6..44dd93f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -110,7 +110,6 @@ public class RawReaderImpl implements RawReader {
TopicName.getPartitionIndex(conf.getSingleTopic()),
false,
consumerFuture,
- SubscriptionMode.Durable,
MessageId.earliest,
0 /* startMessageRollbackDurationInSec */,
Schema.BYTES, null,
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index f63e95f..0c003de 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -229,6 +229,21 @@ public interface ConsumerBuilder<T> extends Cloneable {
ConsumerBuilder<T> subscriptionType(SubscriptionType subscriptionType);
/**
+ * Select the subscription mode to be used when subscribing to the topic.
+ *
+ * <p>Options are:
+ * <ul>
+ * <li>{@link SubscriptionMode#Durable} (Default)</li>
+ * <li>{@link SubscriptionMode#NonDurable}</li>
+ * </ul>
+ *
+ * @param subscriptionMode
+ * the subscription mode value
+ * @return the consumer builder instance
+ */
+ ConsumerBuilder<T> subscriptionMode(SubscriptionMode subscriptionMode);
+
+ /**
* Sets a {@link MessageListener} for the consumer
*
* <p>When a {@link MessageListener} is set, application will receive
messages through it. Calls to
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SubscriptionMode.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SubscriptionMode.java
new file mode 100644
index 0000000..7e11bbb
--- /dev/null
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SubscriptionMode.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+/**
+ * Types of subscription mode supported by Pulsar.
+ */
+public enum SubscriptionMode {
+ // Make the subscription to be backed by a durable cursor that will retain
messages and persist the current
+ // position
+ Durable,
+
+ // Lightweight subscription mode that doesn't have a durable cursor
associated
+ NonDurable
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index f7848b6..f265f38 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -47,6 +47,7 @@ import org.apache.pulsar.client.api.RegexSubscriptionMode;
import
org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
@@ -192,6 +193,13 @@ public class ConsumerBuilderImpl<T> implements
ConsumerBuilder<T> {
}
@Override
+ public ConsumerBuilder<T> subscriptionMode(@NonNull SubscriptionMode
subscriptionMode) {
+ conf.setSubscriptionMode(subscriptionMode);
+ return this;
+ }
+
+
+ @Override
public ConsumerBuilder<T> messageListener(@NonNull MessageListener<T>
messageListener) {
conf.setMessageListener(messageListener);
return this;
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index c7211d2..fce4cc0 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -63,6 +63,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import
org.apache.pulsar.client.api.PulsarClientException.TopicDoesNotExistException;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
@@ -150,39 +151,38 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
private final boolean createTopicIfDoesNotExist;
- enum SubscriptionMode {
- // Make the subscription to be backed by a durable cursor that will
retain messages and persist the current
- // position
- Durable,
- // Lightweight subscription mode that doesn't have a durable cursor
associated
- NonDurable
- }
-
- static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client, String
topic, ConsumerConfigurationData<T> conf,
- ExecutorService listenerExecutor, int partitionIndex, boolean
hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
- SubscriptionMode subscriptionMode, MessageId startMessageId,
Schema<T> schema, ConsumerInterceptors<T> interceptors,
- boolean createTopicIfDoesNotExist) {
+ static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
+ String topic,
+ ConsumerConfigurationData<T>
conf,
+ ExecutorService
listenerExecutor,
+ int partitionIndex,
+ boolean hasParentConsumer,
+ CompletableFuture<Consumer<T>>
subscribeFuture,
+ MessageId startMessageId,
+ Schema<T> schema,
+ ConsumerInterceptors<T>
interceptors,
+ boolean
createTopicIfDoesNotExist) {
if (conf.getReceiverQueueSize() == 0) {
return new ZeroQueueConsumerImpl<>(client, topic, conf,
listenerExecutor, partitionIndex, hasParentConsumer,
subscribeFuture,
- subscriptionMode, startMessageId, schema, interceptors,
+ startMessageId, schema, interceptors,
createTopicIfDoesNotExist);
} else {
return new ConsumerImpl<>(client, topic, conf, listenerExecutor,
partitionIndex, hasParentConsumer,
- subscribeFuture, subscriptionMode, startMessageId, 0 /*
rollback time in sec to start msgId */,
+ subscribeFuture, startMessageId, 0 /* rollback time in sec
to start msgId */,
schema, interceptors, createTopicIfDoesNotExist);
}
}
protected ConsumerImpl(PulsarClientImpl client, String topic,
ConsumerConfigurationData<T> conf,
ExecutorService listenerExecutor, int partitionIndex, boolean
hasParentConsumer,
- CompletableFuture<Consumer<T>> subscribeFuture, SubscriptionMode
subscriptionMode, MessageId startMessageId,
+ CompletableFuture<Consumer<T>> subscribeFuture, MessageId
startMessageId,
long startMessageRollbackDurationInSec, Schema<T> schema,
ConsumerInterceptors<T> interceptors,
boolean createTopicIfDoesNotExist) {
super(client, topic, conf, conf.getReceiverQueueSize(),
listenerExecutor, subscribeFuture, schema, interceptors);
this.consumerId = client.newConsumerId();
- this.subscriptionMode = subscriptionMode;
+ this.subscriptionMode = conf.getSubscriptionMode();
this.startMessageId = startMessageId != null ? new
BatchMessageIdImpl((MessageIdImpl) startMessageId) : null;
this.lastDequeuedMessage = startMessageId == null ? MessageId.earliest
: startMessageId;
this.initialStartMessageId = this.startMessageId;
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 9c95d17..3e9e69a 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -59,7 +59,6 @@ import org.apache.pulsar.client.api.PulsarClientException;
import
org.apache.pulsar.client.api.PulsarClientException.NotSupportedException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.client.util.ConsumerName;
@@ -833,10 +832,10 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
String partitionName =
TopicName.get(topicName).getPartition(partitionIndex).toString();
CompletableFuture<Consumer<T>> subFuture = new
CompletableFuture<>();
ConsumerImpl<T> newConsumer =
ConsumerImpl.newConsumerImpl(client, partitionName,
- configurationData,
client.externalExecutorProvider().getExecutor(),
- partitionIndex, true, subFuture,
- SubscriptionMode.Durable, null, schema,
interceptors,
- createIfDoesNotExist);
+ configurationData,
client.externalExecutorProvider().getExecutor(),
+ partitionIndex, true, subFuture,
+ null, schema, interceptors,
+ createIfDoesNotExist);
consumers.putIfAbsent(newConsumer.getTopic(),
newConsumer);
return subFuture;
})
@@ -847,7 +846,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
CompletableFuture<Consumer<T>> subFuture = new
CompletableFuture<>();
ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client,
topicName, internalConfig,
- client.externalExecutorProvider().getExecutor(), -1, true,
subFuture, SubscriptionMode.Durable, null,
+ client.externalExecutorProvider().getExecutor(), -1, true,
subFuture, null,
schema, interceptors,
createIfDoesNotExist);
consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
@@ -1118,7 +1117,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
ConsumerImpl<T> newConsumer =
ConsumerImpl.newConsumerImpl(
client, partitionName, configurationData,
client.externalExecutorProvider().getExecutor(),
- partitionIndex, true, subFuture,
SubscriptionMode.Durable, null, schema, interceptors,
+ partitionIndex, true, subFuture, null, schema,
interceptors,
true /* createTopicIfDoesNotExist */);
consumers.putIfAbsent(newConsumer.getTopic(),
newConsumer);
if (log.isDebugEnabled()) {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 5a2a6da..f51fb6b 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -64,7 +64,6 @@ import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.transaction.TransactionBuilder;
-import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
@@ -354,7 +353,7 @@ public class PulsarClientImpl implements PulsarClient {
} else {
int partitionIndex = TopicName.getPartitionIndex(topic);
consumer = ConsumerImpl.newConsumerImpl(PulsarClientImpl.this,
topic, conf, listenerThread, partitionIndex, false,
- consumerSubscribedFuture, SubscriptionMode.Durable,
null, schema, interceptors,
+ consumerSubscribedFuture,null, schema, interceptors,
true /* createTopicIfDoesNotExist */);
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index a3ff33c..5799877 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -26,7 +26,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.*;
-import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
import org.apache.pulsar.common.naming.TopicName;
@@ -47,6 +46,7 @@ public class ReaderImpl<T> implements Reader<T> {
consumerConfiguration.getTopicNames().add(readerConfiguration.getTopicName());
consumerConfiguration.setSubscriptionName(subscription);
consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
+ consumerConfiguration.setSubscriptionMode(SubscriptionMode.NonDurable);
consumerConfiguration.setReceiverQueueSize(readerConfiguration.getReceiverQueueSize());
consumerConfiguration.setReadCompacted(readerConfiguration.isReadCompacted());
@@ -83,7 +83,7 @@ public class ReaderImpl<T> implements Reader<T> {
final int partitionIdx =
TopicName.getPartitionIndex(readerConfiguration.getTopicName());
consumer = new ConsumerImpl<>(client,
readerConfiguration.getTopicName(), consumerConfiguration,
- listenerExecutor, partitionIdx, false, consumerFuture,
SubscriptionMode.NonDurable,
+ listenerExecutor, partitionIdx, false, consumerFuture,
readerConfiguration.getStartMessageId(),
readerConfiguration.getStartMessageFromRollbackDurationInSec(),
schema, null, true /* createTopicIfDoesNotExist */);
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
index c3c2f76..39c8687 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
@@ -48,11 +48,11 @@ public class ZeroQueueConsumerImpl<T> extends
ConsumerImpl<T> {
public ZeroQueueConsumerImpl(PulsarClientImpl client, String topic,
ConsumerConfigurationData<T> conf,
ExecutorService listenerExecutor, int partitionIndex, boolean
hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
- SubscriptionMode subscriptionMode, MessageId startMessageId,
Schema<T> schema,
+ MessageId startMessageId, Schema<T> schema,
ConsumerInterceptors<T> interceptors,
boolean createTopicIfDoesNotExist) {
super(client, topic, conf, listenerExecutor, partitionIndex,
hasParentConsumer, subscribeFuture,
- subscriptionMode, startMessageId, 0 /*
startMessageRollbackDurationInSec */, schema, interceptors,
+ startMessageId, 0 /* startMessageRollbackDurationInSec */,
schema, interceptors,
createTopicIfDoesNotExist);
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index 823ca9e..693fde0 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -43,6 +43,7 @@ import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
@Data
@@ -59,6 +60,8 @@ public class ConsumerConfigurationData<T> implements
Serializable, Cloneable {
private SubscriptionType subscriptionType = SubscriptionType.Exclusive;
+ private SubscriptionMode subscriptionMode = SubscriptionMode.Durable;
+
@JsonIgnore
private MessageListener<T> messageListener;
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
index a757651..dcb2e36 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -18,25 +18,32 @@
*/
package org.apache.pulsar.client.impl;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
import io.netty.util.Timer;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import static org.mockito.Mockito.*;
-
public class ConsumerImplTest {
@@ -62,7 +69,7 @@ public class ConsumerImplTest {
consumerConf.setSubscriptionName("test-sub");
consumer = ConsumerImpl.newConsumerImpl(client, topic, consumerConf,
- executorService, -1, false, subscribeFuture,
SubscriptionMode.Durable, null, null, null,
+ executorService, -1, false, subscribeFuture, null, null, null,
true);
}