This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7d9319d [Client]Add autoPartitionsUpdateInterval for producer and
consumer (#7840)
7d9319d is described below
commit 7d9319d844f21ed104c502bcc6c2b853b10ddc80
Author: Aaron Robert <[email protected]>
AuthorDate: Fri Aug 21 22:16:14 2020 +0800
[Client]Add autoPartitionsUpdateInterval for producer and consumer (#7840)
Motivation
Add auto partitions update interval setting for producer and consumer.
Modifications
add autoUpdatePartitionsInterval to partitioned producer and consumer
---
.../java/org/apache/pulsar/client/api/ConsumerBuilder.java | 12 ++++++++++++
.../java/org/apache/pulsar/client/api/ProducerBuilder.java | 12 ++++++++++++
.../org/apache/pulsar/client/impl/ConsumerBuilderImpl.java | 6 ++++++
.../apache/pulsar/client/impl/MultiTopicsConsumerImpl.java | 2 +-
.../apache/pulsar/client/impl/PartitionedProducerImpl.java | 2 +-
.../org/apache/pulsar/client/impl/ProducerBuilderImpl.java | 6 ++++++
.../pulsar/client/impl/conf/ConsumerConfigurationData.java | 7 +++++++
.../pulsar/client/impl/conf/ProducerConfigurationData.java | 6 ++++++
.../pulsar/client/impl/conf/ConfigurationDataUtilsTest.java | 5 +++++
9 files changed, 56 insertions(+), 2 deletions(-)
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index 9bb8ce0..ac26df1 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -553,6 +553,18 @@ public interface ConsumerBuilder<T> extends Cloneable {
ConsumerBuilder<T> autoUpdatePartitions(boolean autoUpdate);
/**
+ * Set the interval of updating partitions <i>(default: 1 minute)</i>.
This only works if autoUpdatePartitions is
+ * enabled.
+ *
+ * @param interval
+ * the interval of updating partitions
+ * @param unit
+ * the time unit of the interval.
+ * @return the consumer builder instance
+ */
+ ConsumerBuilder<T> autoUpdatePartitionsInterval(int interval, TimeUnit
unit);
+
+ /**
* Set KeyShared subscription policy for consumer.
*
* <p>By default, KeyShared subscription use auto split hash range to
maintain consumers. If you want to
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
index f94e6c2..0ca4941 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
@@ -504,6 +504,18 @@ public interface ProducerBuilder<T> extends Cloneable {
ProducerBuilder<T> autoUpdatePartitions(boolean autoUpdate);
/**
+ * Set the interval of updating partitions <i>(default: 1 minute)</i>.
This only works if autoUpdatePartitions is
+ * enabled.
+ *
+ * @param interval
+ * the interval of updating partitions
+ * @param unit
+ * the time unit of the interval.
+ * @return the producer builder instance
+ */
+ ProducerBuilder<T> autoUpdatePartitionsInterval(int interval, TimeUnit
unit);
+
+ /**
* Control whether enable the multiple schema mode for producer.
* If enabled, producer can send a message with different schema from that
specified just when it is created,
* otherwise a invalid message exception would be threw
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index cca7abd..8429598 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -384,6 +384,12 @@ public class ConsumerBuilderImpl<T> implements
ConsumerBuilder<T> {
}
@Override
+ public ConsumerBuilder<T> autoUpdatePartitionsInterval(int interval,
TimeUnit unit) {
+ conf.setAutoUpdatePartitionsIntervalSeconds(interval, unit);
+ return this;
+ }
+
+ @Override
public ConsumerBuilder<T> startMessageIdInclusive() {
conf.setResetIncludeHead(true);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 480dc51..a61952c 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -138,7 +138,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
if (conf.isAutoUpdatePartitions()) {
topicsPartitionChangedListener = new
TopicsPartitionChangedListener();
partitionsAutoUpdateTimeout = client.timer()
- .newTimeout(partitionsAutoUpdateTimerTask, 1,
TimeUnit.MINUTES);
+ .newTimeout(partitionsAutoUpdateTimerTask,
conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS);
}
if (conf.getTopicNames().isEmpty()) {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index 73dcb4d..95a104d 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -81,7 +81,7 @@ public class PartitionedProducerImpl<T> extends
ProducerBase<T> {
if (conf.isAutoUpdatePartitions()) {
topicsPartitionChangedListener = new
TopicsPartitionChangedListener();
partitionsAutoUpdateTimeout = client.timer()
- .newTimeout(partitionsAutoUpdateTimerTask, 1,
TimeUnit.MINUTES);
+ .newTimeout(partitionsAutoUpdateTimerTask,
conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS);
}
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
index 9aaec55..64a22d7 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
@@ -295,6 +295,12 @@ public class ProducerBuilderImpl<T> implements
ProducerBuilder<T> {
}
@Override
+ public ProducerBuilder<T> autoUpdatePartitionsInterval(int interval,
TimeUnit unit) {
+ conf.setAutoUpdatePartitionsIntervalSeconds(interval, unit);
+ return this;
+ }
+
+ @Override
public ProducerBuilder<T> enableMultiSchema(boolean multiSchema) {
conf.setMultiSchema(multiSchema);
return this;
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index 14595e6..7f5d65b 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -119,6 +119,8 @@ public class ConsumerConfigurationData<T> implements
Serializable, Cloneable {
private boolean autoUpdatePartitions = true;
+ private long autoUpdatePartitionsIntervalSeconds = 60;
+
private boolean replicateSubscriptionState = false;
private boolean resetIncludeHead = false;
@@ -127,6 +129,11 @@ public class ConsumerConfigurationData<T> implements
Serializable, Cloneable {
private boolean batchIndexAckEnabled = false;
+ public void setAutoUpdatePartitionsIntervalSeconds(int interval, TimeUnit
timeUnit) {
+ checkArgument(interval > 0, "interval needs to be > 0");
+ this.autoUpdatePartitionsIntervalSeconds =
timeUnit.toSeconds(interval);
+ }
+
@JsonIgnore
public String getSingleTopic() {
checkArgument(topicNames.size() == 1);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
index fd2f678..c48598a 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
@@ -95,6 +95,8 @@ public class ProducerConfigurationData implements
Serializable, Cloneable {
private boolean autoUpdatePartitions = true;
+ private long autoUpdatePartitionsIntervalSeconds = 60;
+
private boolean multiSchema = true;
private SortedMap<String, String> properties = new TreeMap<>();
@@ -163,4 +165,8 @@ public class ProducerConfigurationData implements
Serializable, Cloneable {
return this.batchingPartitionSwitchFrequencyByPublishDelay *
batchingMaxPublishDelayMicros;
}
+ public void setAutoUpdatePartitionsIntervalSeconds(int interval, TimeUnit
timeUnit) {
+ checkArgument(interval > 0, "interval needs to be > 0");
+ this.autoUpdatePartitionsIntervalSeconds =
timeUnit.toSeconds(interval);
+ }
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java
index 9634a87..a1bafbd 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java
@@ -27,6 +27,7 @@ import static org.testng.Assert.fail;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -62,6 +63,7 @@ public class ConfigurationDataUtilsTest {
confData.setProducerName("unset");
confData.setBatchingEnabled(true);
confData.setBatchingMaxMessages(1234);
+ confData.setAutoUpdatePartitionsIntervalSeconds(1, TimeUnit.MINUTES);
Map<String, Object> config = new HashMap<>();
config.put("producerName", "test-producer");
config.put("batchingEnabled", false);
@@ -70,6 +72,7 @@ public class ConfigurationDataUtilsTest {
assertEquals("test-producer", confData.getProducerName());
assertFalse(confData.isBatchingEnabled());
assertEquals(1234, confData.getBatchingMaxMessages());
+ assertEquals(60,confData.getAutoUpdatePartitionsIntervalSeconds());
}
@Test
@@ -78,6 +81,7 @@ public class ConfigurationDataUtilsTest {
confData.setSubscriptionName("unknown-subscription");
confData.setPriorityLevel(10000);
confData.setConsumerName("unknown-consumer");
+ confData.setAutoUpdatePartitionsIntervalSeconds(1, TimeUnit.MINUTES);
Map<String, Object> config = new HashMap<>();
config.put("subscriptionName", "test-subscription");
config.put("priorityLevel", 100);
@@ -85,6 +89,7 @@ public class ConfigurationDataUtilsTest {
assertEquals("test-subscription", confData.getSubscriptionName());
assertEquals(100, confData.getPriorityLevel());
assertEquals("unknown-consumer", confData.getConsumerName());
+ assertEquals(60,confData.getAutoUpdatePartitionsIntervalSeconds());
}
@Test