This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d9319d  [Client]Add autoPartitionsUpdateInterval for producer and 
consumer (#7840)
7d9319d is described below

commit 7d9319d844f21ed104c502bcc6c2b853b10ddc80
Author: Aaron Robert <[email protected]>
AuthorDate: Fri Aug 21 22:16:14 2020 +0800

    [Client]Add autoPartitionsUpdateInterval for producer and consumer (#7840)
    
    Motivation
    Add auto partitions update interval setting for producer and consumer.
    
    Modifications
    add autoUpdatePartitionsInterval to partitioned producer and consumer
---
 .../java/org/apache/pulsar/client/api/ConsumerBuilder.java   | 12 ++++++++++++
 .../java/org/apache/pulsar/client/api/ProducerBuilder.java   | 12 ++++++++++++
 .../org/apache/pulsar/client/impl/ConsumerBuilderImpl.java   |  6 ++++++
 .../apache/pulsar/client/impl/MultiTopicsConsumerImpl.java   |  2 +-
 .../apache/pulsar/client/impl/PartitionedProducerImpl.java   |  2 +-
 .../org/apache/pulsar/client/impl/ProducerBuilderImpl.java   |  6 ++++++
 .../pulsar/client/impl/conf/ConsumerConfigurationData.java   |  7 +++++++
 .../pulsar/client/impl/conf/ProducerConfigurationData.java   |  6 ++++++
 .../pulsar/client/impl/conf/ConfigurationDataUtilsTest.java  |  5 +++++
 9 files changed, 56 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index 9bb8ce0..ac26df1 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -553,6 +553,18 @@ public interface ConsumerBuilder<T> extends Cloneable {
     ConsumerBuilder<T> autoUpdatePartitions(boolean autoUpdate);
 
     /**
+     * Set the interval of updating partitions <i>(default: 1 minute)</i>. 
This only works if autoUpdatePartitions is
+     * enabled.
+     *
+     * @param interval
+     *            the interval of updating partitions
+     * @param unit
+     *            the time unit of the interval.
+     * @return the consumer builder instance
+     */
+    ConsumerBuilder<T> autoUpdatePartitionsInterval(int interval, TimeUnit 
unit);
+
+    /**
      * Set KeyShared subscription policy for consumer.
      *
      * <p>By default, KeyShared subscription use auto split hash range to 
maintain consumers. If you want to
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
index f94e6c2..0ca4941 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
@@ -504,6 +504,18 @@ public interface ProducerBuilder<T> extends Cloneable {
     ProducerBuilder<T> autoUpdatePartitions(boolean autoUpdate);
 
     /**
+     * Set the interval of updating partitions <i>(default: 1 minute)</i>. 
This only works if autoUpdatePartitions is
+     * enabled.
+     *
+     * @param interval
+     *            the interval of updating partitions
+     * @param unit
+     *            the time unit of the interval.
+     * @return the producer builder instance
+     */
+    ProducerBuilder<T> autoUpdatePartitionsInterval(int interval, TimeUnit 
unit);
+
+    /**
      * Control whether enable the multiple schema mode for producer.
      * If enabled, producer can send a message with different schema from that 
specified just when it is created,
      * otherwise a invalid message exception would be threw
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index cca7abd..8429598 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -384,6 +384,12 @@ public class ConsumerBuilderImpl<T> implements 
ConsumerBuilder<T> {
     }
 
     @Override
+    public ConsumerBuilder<T> autoUpdatePartitionsInterval(int interval, 
TimeUnit unit) {
+        conf.setAutoUpdatePartitionsIntervalSeconds(interval, unit);
+        return this;
+    }
+
+    @Override
 
     public ConsumerBuilder<T> startMessageIdInclusive() {
         conf.setResetIncludeHead(true);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 480dc51..a61952c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -138,7 +138,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         if (conf.isAutoUpdatePartitions()) {
             topicsPartitionChangedListener = new 
TopicsPartitionChangedListener();
             partitionsAutoUpdateTimeout = client.timer()
-                .newTimeout(partitionsAutoUpdateTimerTask, 1, 
TimeUnit.MINUTES);
+                .newTimeout(partitionsAutoUpdateTimerTask, 
conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS);
         }
 
         if (conf.getTopicNames().isEmpty()) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index 73dcb4d..95a104d 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -81,7 +81,7 @@ public class PartitionedProducerImpl<T> extends 
ProducerBase<T> {
         if (conf.isAutoUpdatePartitions()) {
             topicsPartitionChangedListener = new 
TopicsPartitionChangedListener();
             partitionsAutoUpdateTimeout = client.timer()
-                .newTimeout(partitionsAutoUpdateTimerTask, 1, 
TimeUnit.MINUTES);
+                .newTimeout(partitionsAutoUpdateTimerTask, 
conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS);
         }
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
index 9aaec55..64a22d7 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
@@ -295,6 +295,12 @@ public class ProducerBuilderImpl<T> implements 
ProducerBuilder<T> {
     }
 
     @Override
+    public ProducerBuilder<T> autoUpdatePartitionsInterval(int interval, 
TimeUnit unit) {
+        conf.setAutoUpdatePartitionsIntervalSeconds(interval, unit);
+        return this;
+    }
+
+    @Override
     public ProducerBuilder<T> enableMultiSchema(boolean multiSchema) {
         conf.setMultiSchema(multiSchema);
         return this;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index 14595e6..7f5d65b 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -119,6 +119,8 @@ public class ConsumerConfigurationData<T> implements 
Serializable, Cloneable {
 
     private boolean autoUpdatePartitions = true;
 
+    private long autoUpdatePartitionsIntervalSeconds = 60;
+
     private boolean replicateSubscriptionState = false;
 
     private boolean resetIncludeHead = false;
@@ -127,6 +129,11 @@ public class ConsumerConfigurationData<T> implements 
Serializable, Cloneable {
 
     private boolean batchIndexAckEnabled = false;
 
+    public void setAutoUpdatePartitionsIntervalSeconds(int interval, TimeUnit 
timeUnit) {
+        checkArgument(interval > 0, "interval needs to be > 0");
+        this.autoUpdatePartitionsIntervalSeconds = 
timeUnit.toSeconds(interval);
+    }
+
     @JsonIgnore
     public String getSingleTopic() {
         checkArgument(topicNames.size() == 1);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
index fd2f678..c48598a 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
@@ -95,6 +95,8 @@ public class ProducerConfigurationData implements 
Serializable, Cloneable {
 
     private boolean autoUpdatePartitions = true;
 
+    private long autoUpdatePartitionsIntervalSeconds = 60;
+
     private boolean multiSchema = true;
 
     private SortedMap<String, String> properties = new TreeMap<>();
@@ -163,4 +165,8 @@ public class ProducerConfigurationData implements 
Serializable, Cloneable {
         return this.batchingPartitionSwitchFrequencyByPublishDelay * 
batchingMaxPublishDelayMicros;
     }
 
+    public void setAutoUpdatePartitionsIntervalSeconds(int interval, TimeUnit 
timeUnit) {
+        checkArgument(interval > 0, "interval needs to be > 0");
+        this.autoUpdatePartitionsIntervalSeconds = 
timeUnit.toSeconds(interval);
+    }
 }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java
index 9634a87..a1bafbd 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java
@@ -27,6 +27,7 @@ import static org.testng.Assert.fail;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.pulsar.client.api.BatcherBuilder;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -62,6 +63,7 @@ public class ConfigurationDataUtilsTest {
         confData.setProducerName("unset");
         confData.setBatchingEnabled(true);
         confData.setBatchingMaxMessages(1234);
+        confData.setAutoUpdatePartitionsIntervalSeconds(1, TimeUnit.MINUTES);
         Map<String, Object> config = new HashMap<>();
         config.put("producerName", "test-producer");
         config.put("batchingEnabled", false);
@@ -70,6 +72,7 @@ public class ConfigurationDataUtilsTest {
         assertEquals("test-producer", confData.getProducerName());
         assertFalse(confData.isBatchingEnabled());
         assertEquals(1234, confData.getBatchingMaxMessages());
+        assertEquals(60,confData.getAutoUpdatePartitionsIntervalSeconds());
     }
 
     @Test
@@ -78,6 +81,7 @@ public class ConfigurationDataUtilsTest {
         confData.setSubscriptionName("unknown-subscription");
         confData.setPriorityLevel(10000);
         confData.setConsumerName("unknown-consumer");
+        confData.setAutoUpdatePartitionsIntervalSeconds(1, TimeUnit.MINUTES);
         Map<String, Object> config = new HashMap<>();
         config.put("subscriptionName", "test-subscription");
         config.put("priorityLevel", 100);
@@ -85,6 +89,7 @@ public class ConfigurationDataUtilsTest {
         assertEquals("test-subscription", confData.getSubscriptionName());
         assertEquals(100, confData.getPriorityLevel());
         assertEquals("unknown-consumer", confData.getConsumerName());
+        assertEquals(60,confData.getAutoUpdatePartitionsIntervalSeconds());
     }
 
     @Test

Reply via email to