Repository: kafka Updated Branches: refs/heads/trunk eb2619cac -> 54ba2280f
KAFKA-3840; Allow clients default OS buffer sizes Follow up on KAFKA-724 (#1469) to allow OS socket buffer sizes auto tuning for both the broker and the clients. Author: Sebastien Launay <[email protected]> Reviewers: Sriharsha Chintalapani <[email protected]>, Grant Henke <[email protected]>, Ismael Juma <[email protected]> Closes #1507 from slaunay/enhancement/os-socket-buffer-size-tuning-for-clients Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/54ba2280 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/54ba2280 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/54ba2280 Branch: refs/heads/trunk Commit: 54ba2280f0c42b0bd3d74c197f97d8a12617a847 Parents: eb2619c Author: Sebastien Launay <[email protected]> Authored: Thu Jun 16 14:34:58 2016 +0200 Committer: Ismael Juma <[email protected]> Committed: Thu Jun 16 14:34:58 2016 +0200 ---------------------------------------------------------------------- .../kafka/clients/consumer/ConsumerConfig.java | 4 +-- .../kafka/clients/producer/ProducerConfig.java | 4 +-- .../clients/consumer/KafkaConsumerTest.java | 30 ++++++++++++++++++++ .../clients/producer/KafkaProducerTest.java | 28 ++++++++++++++++++ 4 files changed, 62 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/54ba2280/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index b7fc1d2..de10bed 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -240,13 +240,13 @@ public class ConsumerConfig extends AbstractConfig { .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, - atLeast(0), + atLeast(-1), Importance.MEDIUM, CommonClientConfigs.SEND_BUFFER_DOC) .define(RECEIVE_BUFFER_CONFIG, Type.INT, 64 * 1024, - atLeast(0), + atLeast(-1), Importance.MEDIUM, CommonClientConfigs.RECEIVE_BUFFER_DOC) .define(FETCH_MIN_BYTES_CONFIG, http://git-wip-us.apache.org/repos/asf/kafka/blob/54ba2280/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 47eb309..c493f67 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -232,8 +232,8 @@ public class ProducerConfig extends AbstractConfig { .define(TIMEOUT_CONFIG, Type.INT, 30 * 1000, atLeast(0), Importance.MEDIUM, TIMEOUT_DOC) .define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0L), Importance.MEDIUM, LINGER_MS_DOC) .define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CommonClientConfigs.CLIENT_ID_DOC) - .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), Importance.MEDIUM, CommonClientConfigs.SEND_BUFFER_DOC) - .define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(0), Importance.MEDIUM, CommonClientConfigs.RECEIVE_BUFFER_DOC) + .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(-1), Importance.MEDIUM, CommonClientConfigs.SEND_BUFFER_DOC) + .define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(-1), Importance.MEDIUM, CommonClientConfigs.RECEIVE_BUFFER_DOC) .define(MAX_REQUEST_SIZE_CONFIG, Type.INT, 1 * 1024 * 1024, http://git-wip-us.apache.org/repos/asf/kafka/blob/54ba2280/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 694faf2..3cbb62f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -33,6 +33,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.network.Selectable; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.record.CompressionType; @@ -59,7 +60,9 @@ import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.concurrent.atomic.AtomicBoolean; @@ -95,6 +98,33 @@ public class KafkaConsumerTest { } @Test + public void testOsDefaultSocketBufferSizes() throws Exception { + Map<String, Object> config = new HashMap<>(); + config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + config.put(ConsumerConfig.SEND_BUFFER_CONFIG, Selectable.USE_DEFAULT_BUFFER_SIZE); + config.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, Selectable.USE_DEFAULT_BUFFER_SIZE); + KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>( + config, new ByteArrayDeserializer(), new ByteArrayDeserializer()); + consumer.close(); + } + + @Test(expected = KafkaException.class) + public void testInvalidSocketSendBufferSize() throws Exception { + Map<String, Object> config = new HashMap<>(); + config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + config.put(ConsumerConfig.SEND_BUFFER_CONFIG, -2); + new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()); + } + + @Test(expected = KafkaException.class) + public void testInvalidSocketReceiveBufferSize() throws Exception { + Map<String, Object> config = new HashMap<>(); + config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + config.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, -2); + new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()); + } + + @Test public void testSubscription() { KafkaConsumer<byte[], byte[]> consumer = newConsumer(); http://git-wip-us.apache.org/repos/asf/kafka/blob/54ba2280/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index 2dada8c..461e3cf 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.clients.producer; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.network.Selectable; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.common.serialization.StringSerializer; @@ -95,4 +96,31 @@ public class KafkaProducerTest { MockProducerInterceptor.resetCounters(); } } + + @Test + public void testOsDefaultSocketBufferSizes() throws Exception { + Map<String, Object> config = new HashMap<>(); + config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + config.put(ProducerConfig.SEND_BUFFER_CONFIG, Selectable.USE_DEFAULT_BUFFER_SIZE); + config.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, Selectable.USE_DEFAULT_BUFFER_SIZE); + KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>( + config, new ByteArraySerializer(), new ByteArraySerializer()); + producer.close(); + } + + @Test(expected = KafkaException.class) + public void testInvalidSocketSendBufferSize() throws Exception { + Map<String, Object> config = new HashMap<>(); + config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + config.put(ProducerConfig.SEND_BUFFER_CONFIG, -2); + new KafkaProducer<>(config, new ByteArraySerializer(), new ByteArraySerializer()); + } + + @Test(expected = KafkaException.class) + public void testInvalidSocketReceiveBufferSize() throws Exception { + Map<String, Object> config = new HashMap<>(); + config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + config.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, -2); + new KafkaProducer<>(config, new ByteArraySerializer(), new ByteArraySerializer()); + } }
