This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 565171f add config to control kafka fetcher size and increase default
(#3869)
565171f is described below
commit 565171f23cc4af4e5a2c68b0d64194561a34ea9a
Author: James Shao <[email protected]>
AuthorDate: Mon Mar 18 17:08:32 2019 -0700
add config to control kafka fetcher size and increase default (#3869)
* add config to control kafka fetcher size and increase default
* update per cr feedback
* change default back to old value
---
.../impl/kafka/KafkaConnectionHandler.java | 26 +++++++----
.../impl/kafka/KafkaLowLevelStreamConfig.java | 34 +++++++++++---
.../impl/kafka/KafkaPartitionLevelConsumer.java | 21 ++++++++-
.../impl/kafka/KafkaStreamConfigProperties.java | 3 ++
.../impl/kafka/KafkaLowLevelStreamConfigTest.java | 53 +++++++++++++++++++++-
.../kafka/KafkaPartitionLevelConsumerTest.java | 7 +++
6 files changed, 124 insertions(+), 20 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaConnectionHandler.java
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaConnectionHandler.java
index 23c5528..a3366a3 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaConnectionHandler.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaConnectionHandler.java
@@ -77,6 +77,8 @@ public class KafkaConnectionHandler {
boolean isPartitionProvided;
+ private final KafkaLowLevelStreamConfig _kafkaLowLevelStreamConfig;
+
@VisibleForTesting
public SimpleConsumer getSimpleConsumer() {
return _simpleConsumer;
@@ -109,19 +111,19 @@ public class KafkaConnectionHandler {
*/
public KafkaConnectionHandler(String clientId, StreamConfig streamConfig,
KafkaSimpleConsumerFactory simpleConsumerFactory) {
- KafkaLowLevelStreamConfig kafkaLowLevelStreamConfig = new
KafkaLowLevelStreamConfig(streamConfig);
+ _kafkaLowLevelStreamConfig = new KafkaLowLevelStreamConfig(streamConfig);
_simpleConsumerFactory = simpleConsumerFactory;
_clientId = clientId;
- _topic = kafkaLowLevelStreamConfig.getKafkaTopicName();
+ _topic = _kafkaLowLevelStreamConfig.getKafkaTopicName();
_connectTimeoutMillis = streamConfig.getConnectionTimeoutMillis();
_simpleConsumer = null;
isPartitionProvided = false;
_partition = Integer.MIN_VALUE;
- _bufferSize = kafkaLowLevelStreamConfig.getKafkaBufferSize();
- _socketTimeout = kafkaLowLevelStreamConfig.getKafkaSocketTimeout();
- initializeBootstrapNodeList(kafkaLowLevelStreamConfig.getBootstrapHosts());
+ _bufferSize = _kafkaLowLevelStreamConfig.getKafkaBufferSize();
+ _socketTimeout = _kafkaLowLevelStreamConfig.getKafkaSocketTimeout();
+
initializeBootstrapNodeList(_kafkaLowLevelStreamConfig.getBootstrapHosts());
setCurrentState(new ConnectingToBootstrapNode());
}
@@ -134,19 +136,19 @@ public class KafkaConnectionHandler {
public KafkaConnectionHandler(String clientId, StreamConfig streamConfig,
int partition,
KafkaSimpleConsumerFactory simpleConsumerFactory) {
- KafkaLowLevelStreamConfig kafkaLowLevelStreamConfig = new
KafkaLowLevelStreamConfig(streamConfig);
+ _kafkaLowLevelStreamConfig = new KafkaLowLevelStreamConfig(streamConfig);
_simpleConsumerFactory = simpleConsumerFactory;
_clientId = clientId;
- _topic = kafkaLowLevelStreamConfig.getKafkaTopicName();
+ _topic = _kafkaLowLevelStreamConfig.getKafkaTopicName();
_connectTimeoutMillis = streamConfig.getConnectionTimeoutMillis();
_simpleConsumer = null;
isPartitionProvided = true;
_partition = partition;
- _bufferSize = kafkaLowLevelStreamConfig.getKafkaBufferSize();
- _socketTimeout = kafkaLowLevelStreamConfig.getKafkaSocketTimeout();
- initializeBootstrapNodeList(kafkaLowLevelStreamConfig.getBootstrapHosts());
+ _bufferSize = _kafkaLowLevelStreamConfig.getKafkaBufferSize();
+ _socketTimeout = _kafkaLowLevelStreamConfig.getKafkaSocketTimeout();
+
initializeBootstrapNodeList(_kafkaLowLevelStreamConfig.getBootstrapHosts());
setCurrentState(new ConnectingToBootstrapNode());
}
@@ -180,6 +182,10 @@ public class KafkaConnectionHandler {
}
}
+ protected KafkaLowLevelStreamConfig getkafkaLowLevelStreamConfig() {
+ return _kafkaLowLevelStreamConfig;
+ }
+
abstract class State {
private ConsumerState stateValue;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaLowLevelStreamConfig.java
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaLowLevelStreamConfig.java
index 76a5287..00ed1fd 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaLowLevelStreamConfig.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaLowLevelStreamConfig.java
@@ -30,10 +30,12 @@ import org.apache.pinot.core.realtime.stream.StreamConfig;
*/
public class KafkaLowLevelStreamConfig {
- private String _kafkaTopicName;
- private String _bootstrapHosts;
- private int _kafkaBufferSize;
- private int _kafkaSocketTimeout;
+ private final String _kafkaTopicName;
+ private final String _bootstrapHosts;
+ private final int _kafkaBufferSize;
+ private final int _kafkaSocketTimeout;
+ private final int _kafkaFetcherSizeBytes;
+ private final int _kafkaFetcherMinBytes;
/**
* Builds a wrapper around {@link StreamConfig} to fetch kafka partition
level consumer related configs
@@ -50,11 +52,18 @@ public class KafkaLowLevelStreamConfig {
.constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE);
String llcTimeoutKey = KafkaStreamConfigProperties
.constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT);
+ String fetcherSizeKey = KafkaStreamConfigProperties
+
.constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_SIZE_BYTES);
+ String fetcherMinBytesKey = KafkaStreamConfigProperties
+
.constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES);
_bootstrapHosts = streamConfigMap.get(llcBrokerListKey);
_kafkaBufferSize = getIntConfigWithDefault(streamConfigMap, llcBufferKey,
KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT);
_kafkaSocketTimeout = getIntConfigWithDefault(streamConfigMap,
llcTimeoutKey,
KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT);
+ _kafkaFetcherSizeBytes = getIntConfigWithDefault(streamConfigMap,
fetcherSizeKey, _kafkaBufferSize);
+ _kafkaFetcherMinBytes = getIntConfigWithDefault(streamConfigMap,
fetcherMinBytesKey,
+
KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES_DEFAULT);
Preconditions.checkNotNull(_bootstrapHosts,
"Must specify kafka brokers list " + llcBrokerListKey + " in case of
low level kafka consumer");
}
@@ -75,6 +84,14 @@ public class KafkaLowLevelStreamConfig {
return _kafkaSocketTimeout;
}
+ public int getKafkaFetcherSizeBytes() {
+ return _kafkaFetcherSizeBytes;
+ }
+
+ public int getKafkaFetcherMinBytes() {
+ return _kafkaFetcherMinBytes;
+ }
+
private int getIntConfigWithDefault(Map<String, String> configMap, String
key, int defaultValue) {
String stringValue = configMap.get(key);
try {
@@ -91,7 +108,8 @@ public class KafkaLowLevelStreamConfig {
public String toString() {
return "KafkaLowLevelStreamConfig{" + "_kafkaTopicName='" +
_kafkaTopicName + '\'' + ", _bootstrapHosts='"
+ _bootstrapHosts + '\'' + ", _kafkaBufferSize='" + _kafkaBufferSize +
'\'' + ", _kafkaSocketTimeout='"
- + _kafkaSocketTimeout + '\'' + '}';
+ + _kafkaSocketTimeout + '\'' + ", _kafkaFetcherSizeBytes='" +
_kafkaFetcherSizeBytes + '\'' + ", _kafkaFetcherMinBytes='"
+ + _kafkaFetcherMinBytes + '\'' + '}';
}
@Override
@@ -109,7 +127,9 @@ public class KafkaLowLevelStreamConfig {
return EqualityUtils.isEqual(_kafkaTopicName, that._kafkaTopicName) &&
EqualityUtils
.isEqual(_bootstrapHosts, that._bootstrapHosts) && EqualityUtils
.isEqual(_kafkaBufferSize, that._kafkaBufferSize) && EqualityUtils
- .isEqual(_kafkaSocketTimeout, that._kafkaSocketTimeout);
+ .isEqual(_kafkaSocketTimeout, that._kafkaSocketTimeout) &&
EqualityUtils
+ .isEqual(_kafkaFetcherSizeBytes, that._kafkaFetcherSizeBytes) &&
EqualityUtils
+ .isEqual(_kafkaFetcherMinBytes, that._kafkaFetcherMinBytes);
}
@Override
@@ -118,6 +138,8 @@ public class KafkaLowLevelStreamConfig {
result = EqualityUtils.hashCodeOf(result, _bootstrapHosts);
result = EqualityUtils.hashCodeOf(result, _kafkaBufferSize);
result = EqualityUtils.hashCodeOf(result, _kafkaSocketTimeout);
+ result = EqualityUtils.hashCodeOf(result, _kafkaFetcherSizeBytes);
+ result = EqualityUtils.hashCodeOf(result, _kafkaFetcherMinBytes);
return result;
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaPartitionLevelConsumer.java
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaPartitionLevelConsumer.java
index b40140c..003153c 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaPartitionLevelConsumer.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaPartitionLevelConsumer.java
@@ -38,14 +38,21 @@ import org.slf4j.LoggerFactory;
public class KafkaPartitionLevelConsumer extends KafkaConnectionHandler
implements PartitionLevelConsumer {
private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaPartitionLevelConsumer.class);
+ private final int _fetchRequestMinBytes;
+ private final int _fetchRequestSizeBytes;
+
public KafkaPartitionLevelConsumer(String clientId, StreamConfig
streamConfig, int partition) {
super(clientId, streamConfig, partition, new
KafkaSimpleConsumerFactoryImpl());
+ _fetchRequestSizeBytes =
getkafkaLowLevelStreamConfig().getKafkaFetcherSizeBytes();
+ _fetchRequestMinBytes =
getkafkaLowLevelStreamConfig().getKafkaFetcherMinBytes();
}
@VisibleForTesting
public KafkaPartitionLevelConsumer(String clientId, StreamConfig
streamConfig, int partition,
KafkaSimpleConsumerFactory kafkaSimpleConsumerFactory) {
super(clientId, streamConfig, partition, kafkaSimpleConsumerFactory);
+ _fetchRequestSizeBytes =
getkafkaLowLevelStreamConfig().getKafkaFetcherSizeBytes();
+ _fetchRequestMinBytes =
getkafkaLowLevelStreamConfig().getKafkaFetcherMinBytes();
}
/**
@@ -75,8 +82,8 @@ public class KafkaPartitionLevelConsumer extends
KafkaConnectionHandler implemen
}
FetchResponse fetchResponse = _simpleConsumer.fetch(
- new FetchRequestBuilder().minBytes(100000).maxWait(timeoutMillis)
- .addFetch(_topic, _partition, startOffset, 500000).build());
+ new
FetchRequestBuilder().minBytes(_fetchRequestMinBytes).maxWait(timeoutMillis)
+ .addFetch(_topic, _partition, startOffset,
_fetchRequestSizeBytes).build());
if (!fetchResponse.hasError()) {
final Iterable<MessageAndOffset> messageAndOffsetIterable =
@@ -108,6 +115,16 @@ public class KafkaPartitionLevelConsumer extends
KafkaConnectionHandler implemen
});
}
+ @VisibleForTesting
+ public int getFetchRequestSize() {
+ return _fetchRequestSizeBytes;
+ }
+
+ @VisibleForTesting
+ public int getFetchRequestMinBytes() {
+ return _fetchRequestMinBytes;
+ }
+
@Override
/**
* Closes this consumer.
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStreamConfigProperties.java
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStreamConfigProperties.java
index e30bfd3..c46f44a 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStreamConfigProperties.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStreamConfigProperties.java
@@ -46,6 +46,9 @@ public class KafkaStreamConfigProperties {
public static final int KAFKA_BUFFER_SIZE_DEFAULT = 512000;
public static final String KAFKA_SOCKET_TIMEOUT = "kafka.socket.timeout";
public static final int KAFKA_SOCKET_TIMEOUT_DEFAULT = 10000;
+ public static final String KAFKA_FETCHER_SIZE_BYTES = "kafka.fetcher.size";
+ public static final String KAFKA_FETCHER_MIN_BYTES =
"kafka.fetcher.minBytes";
+ public static final int KAFKA_FETCHER_MIN_BYTES_DEFAULT = 100000;
}
public static final String KAFKA_CONSUMER_PROP_PREFIX =
"kafka.consumer.prop";
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/kafka/KafkaLowLevelStreamConfigTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/kafka/KafkaLowLevelStreamConfigTest.java
index a0a069f..bdb48cb 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/kafka/KafkaLowLevelStreamConfigTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/kafka/KafkaLowLevelStreamConfigTest.java
@@ -29,7 +29,12 @@ import org.testng.annotations.Test;
public class KafkaLowLevelStreamConfigTest {
private KafkaLowLevelStreamConfig getStreamConfig(String topic, String
bootstrapHosts, String buffer,
- String socketTimeout) {
+ String socketTimeout) {
+ return getStreamConfig(topic, bootstrapHosts, buffer, socketTimeout, null,
null);
+ }
+
+ private KafkaLowLevelStreamConfig getStreamConfig(String topic, String
bootstrapHosts, String buffer,
+ String socketTimeout, String fetcherSize, String fetcherMinBytes) {
Map<String, String> streamConfigMap = new HashMap<>();
String streamType = "kafka";
String consumerType = StreamConfig.ConsumerType.LOWLEVEL.toString();
@@ -53,7 +58,13 @@ public class KafkaLowLevelStreamConfigTest {
streamConfigMap.put("stream.kafka.buffer.size", buffer);
}
if (socketTimeout != null) {
- streamConfigMap.put("stream.kafka.socket.timeout",
String.valueOf(socketTimeout));
+ streamConfigMap.put("stream.kafka.socket.timeout", socketTimeout);
+ }
+ if (fetcherSize != null) {
+ streamConfigMap.put("stream.kafka.fetcher.size", fetcherSize);
+ }
+ if (fetcherMinBytes != null) {
+ streamConfigMap.put("stream.kafka.fetcher.minBytes", fetcherMinBytes);
}
return new KafkaLowLevelStreamConfig(new StreamConfig(streamConfigMap));
}
@@ -109,4 +120,42 @@ public class KafkaLowLevelStreamConfigTest {
config = getStreamConfig("topic", "host1", "", "100");
Assert.assertEquals(100, config.getKafkaSocketTimeout());
}
+
+ @Test
+ public void testGetFetcherSize() {
+ // test default
+ KafkaLowLevelStreamConfig config = getStreamConfig("topic", "host1", "",
"", "",null);
+
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT,
+ config.getKafkaFetcherSizeBytes());
+
+ config = getStreamConfig("topic", "host1", "100", "", "", null);
+ Assert.assertEquals(100, config.getKafkaFetcherSizeBytes());
+
+ config = getStreamConfig("topic", "host1", "100", "", "bad value", null);
+ Assert.assertEquals(100, config.getKafkaFetcherSizeBytes());
+
+ // correct config
+ config = getStreamConfig("topic", "host1", "100", "", "200", null);
+ Assert.assertEquals(200, config.getKafkaFetcherSizeBytes());
+ }
+
+ @Test
+ public void testGetFetcherMinBytes() {
+ // test default
+ KafkaLowLevelStreamConfig config = getStreamConfig("topic", "host1", "",
"", "", null);
+
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES_DEFAULT,
+ config.getKafkaFetcherMinBytes());
+
+ config = getStreamConfig("topic", "host1", "", "", "", "");
+
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES_DEFAULT,
+ config.getKafkaFetcherMinBytes());
+
+ config = getStreamConfig("topic", "host1", "", "", "", "bad value");
+
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES_DEFAULT,
+ config.getKafkaFetcherMinBytes());
+
+ // correct config
+ config = getStreamConfig("topic", "host1", "", "", "", "100");
+ Assert.assertEquals(100, config.getKafkaFetcherMinBytes());
+ }
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/kafka/KafkaPartitionLevelConsumerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/kafka/KafkaPartitionLevelConsumerTest.java
index 173519a..165dcb4 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/kafka/KafkaPartitionLevelConsumerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/kafka/KafkaPartitionLevelConsumerTest.java
@@ -235,6 +235,8 @@ public class KafkaPartitionLevelConsumerTest {
streamConfigMap
.put("stream.kafka.consumer.factory.class.name",
mockKafkaSimpleConsumerFactory.getClass().getName());
streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+ streamConfigMap.put("stream.kafka.fetcher.size", "10000");
+ streamConfigMap.put("stream.kafka.fetcher.minBytes", "20000");
StreamConfig streamConfig = new StreamConfig(streamConfigMap);
KafkaStreamMetadataProvider streamMetadataProvider =
@@ -244,11 +246,16 @@ public class KafkaPartitionLevelConsumerTest {
KafkaPartitionLevelConsumer kafkaSimpleStreamConsumer =
new KafkaPartitionLevelConsumer(clientId, streamConfig, 0,
mockKafkaSimpleConsumerFactory);
kafkaSimpleStreamConsumer.fetchMessages(12345L, 23456L, 10000);
+
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT,
kafkaSimpleStreamConsumer.getSimpleConsumer().bufferSize());
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT,
kafkaSimpleStreamConsumer.getSimpleConsumer().soTimeout());
+ // test parsing values
+ Assert.assertEquals(10000,
kafkaSimpleStreamConsumer.getFetchRequestSize());
+ Assert.assertEquals(20000,
kafkaSimpleStreamConsumer.getFetchRequestMinBytes());
+
// test user defined values
streamConfigMap.put("stream.kafka.buffer.size", "100");
streamConfigMap.put("stream.kafka.socket.timeout", "1000");
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]