This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 16a554b  Allow to configure most client/producer/consumer options in 
Kafka API wrapper (#1207)
16a554b is described below

commit 16a554bfd5903482512d9a567777ccb21dfd5c01
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Fri Feb 9 21:32:49 2018 -0800

    Allow to configure most client/producer/consumer options in Kafka API 
wrapper (#1207)
---
 .../clients/consumer/PulsarKafkaConsumer.java      | 11 ++--
 .../clients/producer/PulsarKafkaProducer.java      |  8 +--
 ...fkaConfig.java => PulsarClientKafkaConfig.java} | 44 ++++++++++++++-
 .../kafka/compat/PulsarConsumerKafkaConfig.java    | 50 +++++++++++++++++
 .../kafka/compat/PulsarProducerKafkaConfig.java    | 64 ++++++++++++++++++++++
 site/docs/latest/adaptors/KafkaWrapper.md          | 42 +++++++++++---
 6 files changed, 203 insertions(+), 16 deletions(-)

diff --git 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
index d3dc6e4..97cde46 100644
--- 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
+++ 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
@@ -56,7 +56,8 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.kafka.compat.MessageIdUtils;
-import org.apache.pulsar.client.kafka.compat.PulsarKafkaConfig;
+import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
+import org.apache.pulsar.client.kafka.compat.PulsarConsumerKafkaConfig;
 import org.apache.pulsar.client.util.ConsumerName;
 import org.apache.pulsar.client.util.FutureUtil;
 import org.apache.pulsar.common.naming.DestinationName;
@@ -80,6 +81,8 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, 
V>, MessageListene
 
     private volatile boolean closed = false;
 
+    private final Properties properties;
+
     private static class QueueItem {
         final org.apache.pulsar.client.api.Consumer consumer;
         final Message message;
@@ -141,9 +144,9 @@ public class PulsarKafkaConsumer<K, V> implements 
Consumer<K, V>, MessageListene
 
         String serviceUrl = 
config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0);
 
-        Properties properties = new Properties();
+        this.properties = new Properties();
         config.originals().forEach((k, v) -> properties.put(k, v));
-        ClientConfiguration clientConf = 
PulsarKafkaConfig.getClientConfiguration(properties);
+        ClientConfiguration clientConf = 
PulsarClientKafkaConfig.getClientConfiguration(properties);
         // Since this client instance is going to be used just for the 
consumers, we can enable Nagle to group
         // all the acknowledgments sent to broker within a short time frame
         clientConf.setUseTcpNoDelay(false);
@@ -201,7 +204,7 @@ public class PulsarKafkaConsumer<K, V> implements 
Consumer<K, V>, MessageListene
                 // acknowledgeCumulative()
                 int numberOfPartitions = ((PulsarClientImpl) 
client).getNumberOfPartitions(topic).get();
 
-                ConsumerConfiguration conf = new ConsumerConfiguration();
+                ConsumerConfiguration conf = 
PulsarConsumerKafkaConfig.getConsumerConfiguration(properties);
                 conf.setSubscriptionType(SubscriptionType.Failover);
                 conf.setMessageListener(this);
                 if (numberOfPartitions > 1) {
diff --git 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
index 793a641..7b8bf9a 100644
--- 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
+++ 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
@@ -48,7 +48,8 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.kafka.compat.MessageIdUtils;
-import org.apache.pulsar.client.kafka.compat.PulsarKafkaConfig;
+import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
+import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig;
 
 public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
 
@@ -106,15 +107,14 @@ public class PulsarKafkaProducer<K, V> implements 
Producer<K, V> {
         }
 
         String serviceUrl = 
producerConfig.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0);
-        ClientConfiguration clientConf = 
PulsarKafkaConfig.getClientConfiguration(properties);
+        ClientConfiguration clientConf = 
PulsarClientKafkaConfig.getClientConfiguration(properties);
         try {
             client = PulsarClient.create(serviceUrl, clientConf);
         } catch (PulsarClientException e) {
             throw new RuntimeException(e);
         }
 
-        pulsarProducerConf = new ProducerConfiguration();
-        pulsarProducerConf.setBatchingEnabled(true);
+        pulsarProducerConf = 
PulsarProducerKafkaConfig.getProducerConfiguration(properties);
 
         // To mimic the same batching mode as Kafka, we need to wait a very 
little amount of
         // time to batch if the client is trying to send messages fast enough
diff --git 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaConfig.java
 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java
similarity index 53%
rename from 
pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaConfig.java
rename to 
pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java
index 2396bac..d9ce75e 100644
--- 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaConfig.java
+++ 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java
@@ -19,11 +19,12 @@
 package org.apache.pulsar.client.kafka.compat;
 
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.ClientConfiguration;
 
-public class PulsarKafkaConfig {
+public class PulsarClientKafkaConfig {
 
     /// Config variables
     public static final String AUTHENTICATION_CLASS = 
"pulsar.authentication.class";
@@ -31,6 +32,17 @@ public class PulsarKafkaConfig {
     public static final String TLS_TRUST_CERTS_FILE_PATH = 
"pulsar.tls.trust.certs.file.path";
     public static final String TLS_ALLOW_INSECURE_CONNECTION = 
"pulsar.tls.allow.insecure.connection";
 
+    public static final String OPERATION_TIMEOUT_MS = 
"pulsar.operation.timeout.ms";
+    public static final String STATS_INTERVAL_SECONDS = 
"pulsar.stats.interval.seconds";
+    public static final String NUM_IO_THREADS = "pulsar.num.io.threads";
+
+    public static final String CONNECTIONS_PER_BROKER = 
"pulsar.connections.per.broker";
+
+    public static final String USE_TCP_NODELAY = "pulsar.use.tcp.nodelay";
+
+    public static final String CONCURRENT_LOOKUP_REQUESTS = 
"pulsar.concurrent.lookup.requests";
+    public static final String MAX_NUMBER_OF_REJECTED_REQUESTS_PER_CONNECTION 
= "pulsar.max.number.rejected.request.per.connection";
+
     public static ClientConfiguration getClientConfiguration(Properties 
properties) {
         ClientConfiguration conf = new ClientConfiguration();
 
@@ -52,6 +64,36 @@ public class PulsarKafkaConfig {
             
conf.setTlsTrustCertsFilePath(properties.getProperty(TLS_TRUST_CERTS_FILE_PATH));
         }
 
+        if (properties.containsKey(OPERATION_TIMEOUT_MS)) {
+            
conf.setOperationTimeout(Integer.parseInt(properties.getProperty(OPERATION_TIMEOUT_MS)),
+                    TimeUnit.MILLISECONDS);
+        }
+
+        if (properties.containsKey(STATS_INTERVAL_SECONDS)) {
+            
conf.setStatsInterval(Integer.parseInt(properties.getProperty(STATS_INTERVAL_SECONDS)),
 TimeUnit.SECONDS);
+        }
+
+        if (properties.containsKey(NUM_IO_THREADS)) {
+            
conf.setIoThreads(Integer.parseInt(properties.getProperty(NUM_IO_THREADS)));
+        }
+
+        if (properties.containsKey(CONNECTIONS_PER_BROKER)) {
+            
conf.setConnectionsPerBroker(Integer.parseInt(properties.getProperty(CONNECTIONS_PER_BROKER)));
+        }
+
+        if (properties.containsKey(USE_TCP_NODELAY)) {
+            
conf.setUseTcpNoDelay(Boolean.parseBoolean(properties.getProperty(USE_TCP_NODELAY)));
+        }
+
+        if (properties.containsKey(CONCURRENT_LOOKUP_REQUESTS)) {
+            
conf.setConcurrentLookupRequest(Integer.parseInt(properties.getProperty(CONCURRENT_LOOKUP_REQUESTS)));
+        }
+
+        if 
(properties.containsKey(MAX_NUMBER_OF_REJECTED_REQUESTS_PER_CONNECTION)) {
+            conf.setMaxNumberOfRejectedRequestPerConnection(
+                    
Integer.parseInt(properties.getProperty(MAX_NUMBER_OF_REJECTED_REQUESTS_PER_CONNECTION)));
+        }
+
         return conf;
     }
 }
diff --git 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
new file mode 100644
index 0000000..f91c484
--- /dev/null
+++ 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat;
+
+import java.util.Properties;
+
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+
+public class PulsarConsumerKafkaConfig {
+
+    /// Config variables
+    public static final String CONSUMER_NAME = "pulsar.consumer.name";
+    public static final String RECEIVER_QUEUE_SIZE = 
"pulsar.consumer.receiver.queue.size";
+    public static final String TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS = 
"pulsar.consumer.total.receiver.queue.size.across.partitions";
+
+    public static ConsumerConfiguration getConsumerConfiguration(Properties 
properties) {
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+
+        if (properties.containsKey(CONSUMER_NAME)) {
+            conf.setConsumerName(properties.getProperty(CONSUMER_NAME));
+        }
+
+        if (properties.containsKey(RECEIVER_QUEUE_SIZE)) {
+            
conf.setReceiverQueueSize(Integer.parseInt(properties.getProperty(RECEIVER_QUEUE_SIZE)));
+        }
+
+        if 
(properties.containsKey(TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS)) {
+            conf.setMaxTotalReceiverQueueSizeAcrossPartitions(
+                    
Integer.parseInt(properties.getProperty(TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS)));
+        }
+
+        return conf;
+    }
+}
diff --git 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
new file mode 100644
index 0000000..c2e4886
--- /dev/null
+++ 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat;
+
+import java.util.Properties;
+
+import org.apache.pulsar.client.api.ProducerConfiguration;
+
+public class PulsarProducerKafkaConfig {
+
+    /// Config variables
+    public static final String PRODUCER_NAME = "pulsar.producer.name";
+    public static final String INITIAL_SEQUENCE_ID = 
"pulsar.producer.initial.sequence.id";
+
+    public static final String MAX_PENDING_MESSAGES = 
"pulsar.producer.max.pending.messages";
+    public static final String MAX_PENDING_MESSAGES_ACROSS_PARTITIONS = 
"pulsar.producer.max.pending.messages.across.partitions";
+    public static final String BATCHING_ENABLED = 
"pulsar.producer.batching.enabled";
+    public static final String BATCHING_MAX_MESSAGES = 
"pulsar.producer.batching.max.messages";
+
+    public static ProducerConfiguration getProducerConfiguration(Properties 
properties) {
+        ProducerConfiguration conf = new ProducerConfiguration();
+
+        if (properties.containsKey(PRODUCER_NAME)) {
+            conf.setProducerName(properties.getProperty(PRODUCER_NAME));
+        }
+
+        if (properties.containsKey(INITIAL_SEQUENCE_ID)) {
+            
conf.setInitialSequenceId(Long.parseLong(properties.getProperty(INITIAL_SEQUENCE_ID)));
+        }
+
+        if (properties.containsKey(MAX_PENDING_MESSAGES)) {
+            
conf.setMaxPendingMessages(Integer.parseInt(properties.getProperty(MAX_PENDING_MESSAGES)));
+        }
+
+        if (properties.containsKey(MAX_PENDING_MESSAGES_ACROSS_PARTITIONS)) {
+            conf.setMaxPendingMessagesAcrossPartitions(
+                    
Integer.parseInt(properties.getProperty(MAX_PENDING_MESSAGES_ACROSS_PARTITIONS)));
+        }
+
+        
conf.setBatchingEnabled(Boolean.parseBoolean(properties.getProperty(BATCHING_ENABLED,
 "true")));
+
+        if (properties.containsKey(BATCHING_MAX_MESSAGES)) {
+            
conf.setBatchingMaxMessages(Integer.parseInt(properties.getProperty(BATCHING_MAX_MESSAGES)));
+        }
+
+        return conf;
+    }
+}
diff --git a/site/docs/latest/adaptors/KafkaWrapper.md 
b/site/docs/latest/adaptors/KafkaWrapper.md
index 53bf2e5..444cb0f 100644
--- a/site/docs/latest/adaptors/KafkaWrapper.md
+++ b/site/docs/latest/adaptors/KafkaWrapper.md
@@ -183,9 +183,9 @@ APIs:
 | `void commitAsync()`                                                         
                           | Yes       |       |
 | `void commitAsync(OffsetCommitCallback callback)`                            
                           | Yes       |       |
 | `void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, 
OffsetCommitCallback callback)`       | Yes       |       |
-| `void seek(TopicPartition partition, long offset)`                           
                           | Yes        |       |
-| `void seekToBeginning(Collection<TopicPartition> partitions)`                
                           | Yes        |       |
-| `void seekToEnd(Collection<TopicPartition> partitions)`                      
                           | Yes        |       |
+| `void seek(TopicPartition partition, long offset)`                           
                           | Yes       |       |
+| `void seekToBeginning(Collection<TopicPartition> partitions)`                
                           | Yes       |       |
+| `void seekToEnd(Collection<TopicPartition> partitions)`                      
                           | Yes       |       |
 | `long position(TopicPartition partition)`                                    
                           | Yes       |       |
 | `OffsetAndMetadata committed(TopicPartition partition)`                      
                           | Yes       |       |
 | `Map<MetricName, ? extends Metric> metrics()`                                
                           | No        |       |
@@ -229,11 +229,39 @@ Properties:
 
 You can configure Pulsar authentication provider directly from the Kafka 
properties.
 
-Properties:
+### Pulsar client properties:
 
 | Config property                        | Default | Notes                     
                                                             |
 
|:---------------------------------------|:--------|:---------------------------------------------------------------------------------------|
 | `pulsar.authentication.class`          |         | Configure to auth 
provider. Eg. `org.apache.pulsar.client.impl.auth.AuthenticationTls` |
-| `pulsar.use.tls`                       | `false` | Enable TLS transport 
encryption                                                        |
-| `pulsar.tls.trust.certs.file.path`     |         | Path for the TLS trust 
certificate store                                               |
-| `pulsar.tls.allow.insecure.connection` | `false` | Accept self-signed 
certificates from brokers                                           |
+| 
[`pulsar.use.tls`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setUseTls-boolean-)
                       | `false` | Enable TLS transport encryption              
                                          |
+| 
[`pulsar.tls.trust.certs.file.path`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setTlsTrustCertsFilePath-java.lang.String-)
   |         | Path for the TLS trust certificate store                         
                      |
+| 
[`pulsar.tls.allow.insecure.connection`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setTlsAllowInsecureConnection-boolean-)
 | `false` | Accept self-signed certificates from brokers                       
                    |
+| 
[`pulsar.operation.timeout.ms`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setOperationTimeout-int-java.util.concurrent.TimeUnit-)
 | `30000` | General operations timeout |
+| 
[`pulsar.stats.interval.seconds`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setStatsInterval-long-java.util.concurrent.TimeUnit-)
 | `60` | Pulsar client lib stats printing interval |
+| 
[`pulsar.num.io.threads`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setIoThreads-int-)
 | `1` | Number of Netty IO threads to use |
+| 
[`pulsar.connections.per.broker`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setConnectionsPerBroker-int-)
 | `1` | Max number of connection to open to each broker |
+| 
[`pulsar.use.tcp.nodelay`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setUseTcpNoDelay-boolean-)
 | `true` | TCP no-delay |
+| 
[`pulsar.concurrent.lookup.requests`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setConcurrentLookupRequest-int-)
 | `50000` | Max number of concurrent topic lookups |
+| 
[`pulsar.max.number.rejected.request.per.connection`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ClientConfiguration.html#setMaxNumberOfRejectedRequestPerConnection-int-)
 | `50` | Threshold of errors to forcefully close a connection |
+
+
+### Pulsar producer properties
+
+| Config property                        | Default | Notes                     
                                                             |
+|:---------------------------------------|:--------|:---------------------------------------------------------------------------------------|
+| 
[`pulsar.producer.name`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ProducerConfiguration.html#setProducerName-java.lang.String-)
 | | Specify producer name |
+| 
[`pulsar.producer.initial.sequence.id`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ProducerConfiguration.html#setInitialSequenceId-long-)
 |  | Specify baseline for sequence id for this producer |
+| 
[`pulsar.producer.max.pending.messages`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ProducerConfiguration.html#setMaxPendingMessages-int-)
 | `1000` | Set the max size of the queue holding the messages pending to 
receive an acknowledgment from the broker.  |
+| 
[`pulsar.producer.max.pending.messages.across.partitions`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ProducerConfiguration.html#setMaxPendingMessagesAcrossPartitions-int-)
 | `50000` | Set the number of max pending messages across all the partitions  |
+| 
[`pulsar.producer.batching.enabled`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ProducerConfiguration.html#setBatchingEnabled-boolean-)
 | `true` | Control whether automatic batching of messages is enabled for the 
producer |
+| 
[`pulsar.producer.batching.max.messages`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ProducerConfiguration.html#setBatchingMaxMessages-int-)
 | `1000` | The maximum number of messages permitted in a batch |
+
+
+### Pulsar consumer Properties
+
+| Config property                        | Default | Notes                     
                                                             |
+|:---------------------------------------|:--------|:---------------------------------------------------------------------------------------|
+| 
[`pulsar.consumer.name`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerConfiguration.html#setConsumerName-java.lang.String-)
 | | Set the consumer name |
+| 
[`pulsar.consumer.receiver.queue.size`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerConfiguration.html#setReceiverQueueSize-int-)
 | 1000 | Sets the size of the consumer receive queue |
+| 
[`pulsar.consumer.total.receiver.queue.size.across.partitions`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerConfiguration.html#setMaxTotalReceiverQueueSizeAcrossPartitions-int-)
 | 50000 | Set the max total receiver queue size across partitons |

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to