This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new cfd2178ca80 [feature][client] PIP-184: Topic specific consumer 
priorityLevel (#16715)
cfd2178ca80 is described below

commit cfd2178ca80e4b6db89385cd39646f20f1c2c066
Author: Dave Maughan <[email protected]>
AuthorDate: Tue Aug 2 12:37:49 2022 +0100

    [feature][client] PIP-184: Topic specific consumer priorityLevel (#16715)
---
 .../apache/pulsar/client/api/ConsumerBuilder.java  | 34 +++++++++
 .../pulsar/client/api/TopicConsumerBuilder.java    | 47 ++++++++++++
 .../pulsar/client/impl/ConsumerBuilderImpl.java    | 30 ++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  5 +-
 .../client/impl/TopicConsumerBuilderImpl.java      | 43 +++++++++++
 .../impl/conf/ConsumerConfigurationData.java       | 16 ++++
 .../impl/conf/TopicConsumerConfigurationData.java  | 85 ++++++++++++++++++++++
 .../client/impl/ConsumerBuilderImplTest.java       | 18 +++++
 .../pulsar/client/impl/ConsumerImplTest.java       | 26 ++++++-
 .../client/impl/TopicConsumerBuilderImplTest.java  | 55 ++++++++++++++
 .../impl/conf/ConsumerConfigurationDataTest.java   | 48 ++++++++++++
 .../conf/TopicConsumerConfigurationDataTest.java   | 74 +++++++++++++++++++
 site2/docs/client-libraries-java.md                |  2 +-
 13 files changed, 477 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index e303e55538e..804aa7097f3 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -813,4 +813,38 @@ public interface ConsumerBuilder<T> extends Cloneable {
      * @param enabled whether to enable AutoScaledReceiverQueueSize.
      */
     ConsumerBuilder<T> autoScaledReceiverQueueSizeEnabled(boolean enabled);
+
+    /**
+     * Configure topic specific options to override those set at the {@link 
ConsumerBuilder} level.
+     *
+     * @param topicName a topic name
+     * @return a {@link TopicConsumerBuilder} instance
+     */
+    TopicConsumerBuilder<T> topicConfiguration(String topicName);
+
+    /**
+     * Configure topic specific options to override those set at the {@link 
ConsumerBuilder} level.
+     *
+     * @param topicName a topic name
+     * @param builderConsumer a consumer to allow the configuration of the 
{@link TopicConsumerBuilder} instance
+     */
+    ConsumerBuilder<T> topicConfiguration(String topicName,
+                                          
java.util.function.Consumer<TopicConsumerBuilder<T>> builderConsumer);
+
+    /**
+     * Configure topic specific options to override those set at the {@link 
ConsumerBuilder} level.
+     *
+     * @param topicsPattern a regular expression to match a topic name
+     * @return a {@link TopicConsumerBuilder} instance
+     */
+    TopicConsumerBuilder<T> topicConfiguration(Pattern topicsPattern);
+
+    /**
+     * Configure topic specific options to override those set at the {@link 
ConsumerBuilder} level.
+     *
+     * @param topicsPattern a regular expression to match a topic name
+     * @param builderConsumer a consumer to allow the configuration of the 
{@link TopicConsumerBuilder} instance
+     */
+    ConsumerBuilder<T> topicConfiguration(Pattern topicsPattern,
+                                          
java.util.function.Consumer<TopicConsumerBuilder<T>> builderConsumer);
 }
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicConsumerBuilder.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicConsumerBuilder.java
new file mode 100644
index 00000000000..5096f525776
--- /dev/null
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicConsumerBuilder.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+/**
+ * {@link TopicConsumerBuilder} is used to configure topic specific options to 
override those set at the
+ * {@link ConsumerBuilder} level.
+ *
+ * @see ConsumerBuilder#topicConfiguration(String)
+ *
+ * @param <T> the type of the value in the {@link ConsumerBuilder}
+ */
+public interface TopicConsumerBuilder<T> {
+    /**
+     * Configure the priority level of this topic.
+     *
+     * @see ConsumerBuilder#priorityLevel(int)
+     *
+     * @param priorityLevel the priority of this topic
+     * @return the {@link TopicConsumerBuilder} instance
+     */
+    TopicConsumerBuilder<T> priorityLevel(int priorityLevel);
+
+    /**
+     * Complete the configuration of the topic specific options and return 
control back to the
+     * {@link ConsumerBuilder} instance.
+     *
+     * @return the {@link ConsumerBuilder} instance
+     */
+    ConsumerBuilder<T> build();
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 616ed86abb8..d30e72aa53c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -52,8 +52,10 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.TopicConsumerBuilder;
 import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.impl.conf.TopicConsumerConfigurationData;
 import org.apache.pulsar.client.util.RetryMessageUtil;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
@@ -537,4 +539,32 @@ public class ConsumerBuilderImpl<T> implements 
ConsumerBuilder<T> {
         conf.setAutoScaledReceiverQueueSizeEnabled(enabled);
         return this;
     }
+
+    @Override
+    public TopicConsumerBuilder<T> topicConfiguration(String topicName) {
+        TopicConsumerConfigurationData topicConf = 
TopicConsumerConfigurationData.ofTopicName(topicName, conf);
+        conf.getTopicConfigurations().add(topicConf);
+        return new TopicConsumerBuilderImpl<>(this, topicConf);
+    }
+
+    @Override
+    public ConsumerBuilder<T> topicConfiguration(String topicName,
+                                                 
java.util.function.Consumer<TopicConsumerBuilder<T>> builderConsumer) {
+        builderConsumer.accept(topicConfiguration(topicName));
+        return this;
+    }
+
+    @Override
+    public TopicConsumerBuilder<T> topicConfiguration(Pattern topicsPattern) {
+        TopicConsumerConfigurationData topicConf = 
TopicConsumerConfigurationData.ofTopicsPattern(topicsPattern, conf);
+        conf.getTopicConfigurations().add(topicConf);
+        return new TopicConsumerBuilderImpl<>(this, topicConf);
+    }
+
+    @Override
+    public ConsumerBuilder<T> topicConfiguration(Pattern topicsPattern,
+                                                 
java.util.function.Consumer<TopicConsumerBuilder<T>> builderConsumer) {
+        builderConsumer.accept(topicConfiguration(topicsPattern));
+        return this;
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 5bc998c4526..c474da345c2 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -61,6 +61,8 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import lombok.AccessLevel;
+import lombok.Getter;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
@@ -146,6 +148,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     private final NegativeAcksTracker negativeAcksTracker;
 
     protected final ConsumerStatsRecorder stats;
+    @Getter(AccessLevel.PACKAGE)
     private final int priorityLevel;
     private final SubscriptionMode subscriptionMode;
     private volatile BatchMessageIdImpl startMessageId;
@@ -266,7 +269,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         this.partitionIndex = partitionIndex;
         this.hasParentConsumer = hasParentConsumer;
         this.parentConsumerHasListener = parentConsumerHasListener;
-        this.priorityLevel = conf.getPriorityLevel();
+        this.priorityLevel = 
conf.getMatchingTopicConfiguration(topic).getPriorityLevel();
         this.readCompacted = conf.isReadCompacted();
         this.subscriptionInitialPosition = 
conf.getSubscriptionInitialPosition();
         this.negativeAcksTracker = new NegativeAcksTracker(this, conf);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicConsumerBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicConsumerBuilderImpl.java
new file mode 100644
index 00000000000..33f91366584
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicConsumerBuilderImpl.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import lombok.RequiredArgsConstructor;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.TopicConsumerBuilder;
+import org.apache.pulsar.client.impl.conf.TopicConsumerConfigurationData;
+
+@RequiredArgsConstructor
+class TopicConsumerBuilderImpl<T> implements TopicConsumerBuilder<T> {
+    private final ConsumerBuilder<T> consumerBuilder;
+    private final TopicConsumerConfigurationData topicConf;
+
+    @Override
+    public TopicConsumerBuilder<T> priorityLevel(int priorityLevel) {
+        checkArgument(priorityLevel >= 0, "priorityLevel needs to be >= 0");
+        topicConf.setPriorityLevel(priorityLevel);
+        return this;
+    }
+
+    @Override
+    public ConsumerBuilder<T> build() {
+        return consumerBuilder;
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index 6c22d143a6f..dcde042f4e8 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -22,6 +22,8 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.google.common.collect.Sets;
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedMap;
@@ -164,6 +166,20 @@ public class ConsumerConfigurationData<T> implements 
Serializable, Cloneable {
 
     private boolean autoScaledReceiverQueueSizeEnabled = false;
 
+    private List<TopicConsumerConfigurationData> topicConfigurations = new 
ArrayList<>();
+
+    public TopicConsumerConfigurationData getMatchingTopicConfiguration(String 
topicName) {
+        return topicConfigurations.stream()
+                .filter(topicConf -> 
topicConf.getTopicNameMatcher().matches(topicName))
+                .findFirst()
+                .orElseGet(() -> 
TopicConsumerConfigurationData.ofTopicName(topicName, this));
+    }
+
+    public void setTopicConfigurations(List<TopicConsumerConfigurationData> 
topicConfigurations) {
+        checkArgument(topicConfigurations != null, "topicConfigurations should 
not be null.");
+        this.topicConfigurations = topicConfigurations;
+    }
+
     public void setAutoUpdatePartitionsIntervalSeconds(int interval, TimeUnit 
timeUnit) {
         checkArgument(interval > 0, "interval needs to be > 0");
         this.autoUpdatePartitionsIntervalSeconds = 
timeUnit.toSeconds(interval);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/TopicConsumerConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/TopicConsumerConfigurationData.java
new file mode 100644
index 00000000000..e6d7a9aa0d7
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/TopicConsumerConfigurationData.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.conf;
+
+import java.io.Serializable;
+import java.util.regex.Pattern;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class TopicConsumerConfigurationData implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private TopicNameMatcher topicNameMatcher;
+    private int priorityLevel;
+
+    public static TopicConsumerConfigurationData ofTopicsPattern(@NonNull 
Pattern topicsPattern, int priorityLevel) {
+        return of(new TopicNameMatcher.TopicsPattern(topicsPattern), 
priorityLevel);
+    }
+
+    public static TopicConsumerConfigurationData ofTopicsPattern(@NonNull 
Pattern topicsPattern,
+                                                                 
ConsumerConfigurationData<?> conf) {
+        return ofTopicsPattern(topicsPattern, conf.getPriorityLevel());
+    }
+
+    public static TopicConsumerConfigurationData ofTopicName(@NonNull String 
topicName, int priorityLevel) {
+        return of(new TopicNameMatcher.TopicName(topicName), priorityLevel);
+    }
+
+    public static TopicConsumerConfigurationData ofTopicName(@NonNull String 
topicName,
+                                                             
ConsumerConfigurationData<?> conf) {
+        return ofTopicName(topicName, conf.getPriorityLevel());
+    }
+
+    static TopicConsumerConfigurationData of(@NonNull TopicNameMatcher 
topicNameMatcher, int priorityLevel) {
+        return new TopicConsumerConfigurationData(topicNameMatcher, 
priorityLevel);
+    }
+
+    public interface TopicNameMatcher extends Serializable {
+        boolean matches(String topicName);
+
+        @RequiredArgsConstructor
+        class TopicsPattern implements TopicNameMatcher {
+            @NonNull
+            private final Pattern topicsPattern;
+
+            @Override
+            public boolean matches(String topicName) {
+                return topicsPattern.matcher(topicName).matches();
+            }
+        }
+
+        @RequiredArgsConstructor
+        class TopicName implements TopicNameMatcher {
+            @NonNull
+            private final String topicName;
+
+            @Override
+            public boolean matches(String topicName) {
+                return this.topicName.equals(topicName);
+            }
+        }
+    }
+}
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
index ff60caca2c1..32afe69c3d0 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java
@@ -18,10 +18,12 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertNotNull;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -31,12 +33,14 @@ import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 import org.apache.pulsar.client.api.BatchReceivePolicy;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.DeadLetterPolicy;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.impl.conf.TopicConsumerConfigurationData;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -338,4 +342,18 @@ public class ConsumerBuilderImplTest {
         consumerBuilderImpl.startPaused(true);
         verify(consumerBuilderImpl.getConf()).setStartPaused(true);
     }
+
+    @Test
+    public void testTopicConsumerBuilder() {
+        List<TopicConsumerConfigurationData> 
topicConsumerConfigurationDataList = new ArrayList<>();
+        
when(consumerBuilderImpl.getConf().getTopicConfigurations()).thenReturn(topicConsumerConfigurationDataList);
+
+        ConsumerBuilder<?> consumerBuilder = 
consumerBuilderImpl.topicConfiguration(Pattern.compile("foo")).priorityLevel(1).build();
+
+        assertThat(consumerBuilder).isSameAs(consumerBuilderImpl);
+        assertThat(topicConsumerConfigurationDataList).hasSize(1);
+        TopicConsumerConfigurationData topicConsumerConfigurationData = 
topicConsumerConfigurationDataList.get(0);
+        
assertThat(topicConsumerConfigurationData.getTopicNameMatcher().matches("foo")).isTrue();
+        
assertThat(topicConsumerConfigurationData.getPriorityLevel()).isEqualTo(1);
+    }
 }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
index fcea9d49070..756c18441b3 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
@@ -39,6 +40,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.impl.conf.TopicConsumerConfigurationData;
 import org.apache.pulsar.client.util.ExecutorProvider;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
@@ -47,23 +49,28 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 public class ConsumerImplTest {
+    private final String topic = "non-persistent://tenant/ns1/my-topic";
 
     private ExecutorProvider executorProvider;
     private ExecutorService internalExecutor;
     private ConsumerImpl<byte[]> consumer;
-    private ConsumerConfigurationData consumerConf;
+    private ConsumerConfigurationData<byte[]> consumerConf;
 
     @BeforeMethod(alwaysRun = true)
     public void setUp() {
+        consumerConf = new ConsumerConfigurationData<>();
+        createConsumer(consumerConf);
+    }
+
+    private void createConsumer(ConsumerConfigurationData consumerConf) {
         executorProvider = new ExecutorProvider(1, "ConsumerImplTest");
         internalExecutor = Executors.newSingleThreadScheduledExecutor();
-        consumerConf = new ConsumerConfigurationData<>();
+
         PulsarClientImpl client = 
ClientTestFixtures.createPulsarClientMock(executorProvider, internalExecutor);
         ClientConfigurationData clientConf = client.getConfiguration();
         clientConf.setOperationTimeoutMs(100);
         clientConf.setStatsIntervalSeconds(0);
-        CompletableFuture<Consumer<ConsumerImpl>> subscribeFuture = new 
CompletableFuture<>();
-        String topic = "non-persistent://tenant/ns1/my-topic";
+        CompletableFuture<Consumer<byte[]>> subscribeFuture = new 
CompletableFuture<>();
 
         consumerConf.setSubscriptionName("test-sub");
         consumer = ConsumerImpl.newConsumerImpl(client, topic, consumerConf,
@@ -239,4 +246,15 @@ public class ConsumerImplTest {
         Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), size + 
100);
         Assert.assertEquals(consumer.getAvailablePermits(), permits + 100);
     }
+
+    @Test
+    public void testTopicPriorityLevel() {
+        ConsumerConfigurationData<Object> consumerConf = new 
ConsumerConfigurationData<>();
+        consumerConf.getTopicConfigurations().add(
+                TopicConsumerConfigurationData.ofTopicName(topic, 1));
+
+        createConsumer(consumerConf);
+
+        assertThat(consumer.getPriorityLevel()).isEqualTo(1);
+    }
 }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicConsumerBuilderImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicConsumerBuilderImplTest.java
new file mode 100644
index 00000000000..cfc2380cebe
--- /dev/null
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicConsumerBuilderImplTest.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static 
org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.impl.conf.TopicConsumerConfigurationData;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TopicConsumerBuilderImplTest {
+    private TopicConsumerConfigurationData topicConsumerConfigurationData;
+    private TopicConsumerBuilderImpl<String> topicConsumerBuilderImpl;
+
+    @SuppressWarnings("unchecked")
+    @BeforeMethod(alwaysRun = true)
+    public void setup() {
+        ConsumerBuilder<String> consumerBuilder = mock(ConsumerBuilder.class);
+        topicConsumerConfigurationData = 
mock(TopicConsumerConfigurationData.class);
+        topicConsumerBuilderImpl = new 
TopicConsumerBuilderImpl<>(consumerBuilder, topicConsumerConfigurationData);
+    }
+
+    @Test
+    public void testInvalidPriorityLevel() {
+        assertThatIllegalArgumentException()
+                .isThrownBy(() -> topicConsumerBuilderImpl.priorityLevel(-1));
+        verify(topicConsumerConfigurationData, 
never()).setPriorityLevel(anyInt());
+    }
+
+    @Test
+    public void testValidPriorityLevel() {
+        topicConsumerBuilderImpl.priorityLevel(0);
+        verify(topicConsumerConfigurationData).setPriorityLevel(0);
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationDataTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationDataTest.java
new file mode 100644
index 00000000000..0ec031d2505
--- /dev/null
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationDataTest.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.conf;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import java.util.regex.Pattern;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class ConsumerConfigurationDataTest {
+    @DataProvider(name = "topicConf")
+    public Object[][] topicConf() {
+        return new Object[][] {
+                new Object[] {"foo", 2},
+                new Object[] {"bar", 1}
+        };
+    }
+
+    @Test(dataProvider = "topicConf")
+    public void testTopicConsumerConfigurationData(String topicName, int 
expectedPriority) {
+        ConsumerConfigurationData<String> consumerConfigurationData = new 
ConsumerConfigurationData<>();
+        consumerConfigurationData.setPriorityLevel(1);
+
+        consumerConfigurationData.getTopicConfigurations()
+                
.add(TopicConsumerConfigurationData.ofTopicsPattern(Pattern.compile("^foo$"), 
2));
+
+        TopicConsumerConfigurationData topicConsumerConfigurationData =
+                
consumerConfigurationData.getMatchingTopicConfiguration(topicName);
+
+        
assertThat(topicConsumerConfigurationData.getPriorityLevel()).isEqualTo(expectedPriority);
+    }
+}
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/TopicConsumerConfigurationDataTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/TopicConsumerConfigurationDataTest.java
new file mode 100644
index 00000000000..a2bea68d1ac
--- /dev/null
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/TopicConsumerConfigurationDataTest.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.conf;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNullPointerException;
+import java.util.regex.Pattern;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class TopicConsumerConfigurationDataTest {
+    @Test
+    public void testOfFactoryMethod() {
+        TopicConsumerConfigurationData topicConsumerConfigurationData = 
TopicConsumerConfigurationData
+                .ofTopicName("foo", 1);
+
+        
assertThat(topicConsumerConfigurationData.getTopicNameMatcher().matches("foo")).isTrue();
+        
assertThat(topicConsumerConfigurationData.getPriorityLevel()).isEqualTo(1);
+    }
+
+    @Test
+    public void testOfDefaultFactoryMethod() {
+        ConsumerConfigurationData<Object> consumerConfigurationData = new 
ConsumerConfigurationData<>();
+        consumerConfigurationData.setPriorityLevel(1);
+        TopicConsumerConfigurationData topicConsumerConfigurationData = 
TopicConsumerConfigurationData
+                .ofTopicName("foo", consumerConfigurationData);
+
+        
assertThat(topicConsumerConfigurationData.getTopicNameMatcher().matches("foo")).isTrue();
+        
assertThat(topicConsumerConfigurationData.getPriorityLevel()).isEqualTo(1);
+    }
+
+    @DataProvider(name = "topicNameMatch")
+    public Object[][] topicNameMatch() {
+        return new Object[][] {
+                new Object[] {"foo", true},
+                new Object[] {"bar", false}
+        };
+    }
+
+    @Test(dataProvider = "topicNameMatch")
+    public void testTopicNameMatch(String topicName, boolean expectedMatch) {
+        TopicConsumerConfigurationData topicConsumerConfigurationData = 
TopicConsumerConfigurationData
+                .ofTopicsPattern(Pattern.compile("^foo$"), 1);
+        
assertThat(topicConsumerConfigurationData.getTopicNameMatcher().matches(topicName)).isEqualTo(expectedMatch);
+    }
+
+    @Test
+    public void testNullTopicsPattern() {
+        assertThatNullPointerException()
+                .isThrownBy(() -> 
TopicConsumerConfigurationData.ofTopicsPattern(null, 1));
+    }
+
+    @Test
+    public void testTopicNameMatchNullTopicName() {
+        assertThat(TopicConsumerConfigurationData
+                .ofTopicName("foo", 
1).getTopicNameMatcher().matches(null)).isFalse();
+    }
+}
diff --git a/site2/docs/client-libraries-java.md 
b/site2/docs/client-libraries-java.md
index 6e72d7fdae2..670736ad3ad 100644
--- a/site2/docs/client-libraries-java.md
+++ b/site2/docs/client-libraries-java.md
@@ -729,7 +729,7 @@ When you create a consumer, you can use the `loadConf` 
configuration. The follow
 `consumerName`|String|Consumer name|null
 `ackTimeoutMillis`|long|Timeout of unacked messages|0
 `tickDurationMillis`|long|Granularity of the ack-timeout redelivery.<br /><br 
/>Using an higher `tickDurationMillis` reduces the memory overhead to track 
messages when setting ack-timeout to a bigger value (for example, 1 hour).|1000
-`priorityLevel`|int|Priority level for a consumer to which a broker gives more 
priority while dispatching messages in Shared subscription type. <br /><br 
/>The broker follows descending priorities. For example, 0=max-priority, 1, 
2,...<br /><br />In Shared subscription type, the broker **first dispatches 
messages to the max priority level consumers if they have permits**. Otherwise, 
the broker considers next priority level consumers.<br /><br /> **Example 
1**<br />If a subscription has c [...]
+`priorityLevel`|int|Priority level for a consumer to which a broker gives more 
priority while dispatching messages in Shared subscription type. It can be set 
at the consumer level so all topics being consumed will have the same priority 
level or each topic being consumed can be given a different priority level.<br 
/><br />The broker follows descending priorities. For example, 0=max-priority, 
1, 2,...<br /><br />In Shared subscription type, the broker **first dispatches 
messages to the ma [...]
 `cryptoFailureAction`|ConsumerCryptoFailureAction|Consumer should take action 
when it receives a message that can not be decrypted.<br /><li>**FAIL**: this 
is the default option to fail messages until crypto succeeds.</li><li> 
**DISCARD**:silently acknowledge and not deliver message to an 
application.</li><li>**CONSUME**: deliver encrypted messages to applications. 
It is the application's responsibility to decrypt the message.</li><br />The 
decompression of message fails. <br /><br />If  [...]
 `properties`|SortedMap<String, String>|A name or value property of this 
consumer.<br /><br />`properties` is application defined metadata attached to a 
consumer. <br /><br />When getting a topic stats, associate this metadata with 
the consumer stats for easier identification.|new TreeMap()
 `readCompacted`|boolean|If enabling `readCompacted`, a consumer reads messages 
from a compacted topic rather than reading a full message backlog of a 
topic.<br /><br /> A consumer only sees the latest value for each key in the 
compacted topic, up until reaching the point in the topic message when 
compacting backlog. Beyond that point, send messages as normal.<br /><br />Only 
enabling `readCompacted` on subscriptions to persistent topics, which have a 
single active consumer (like failure  [...]

Reply via email to