snleee closed pull request #3584: add kafka simple consumer buffer and timeout
to stream config
URL: https://github.com/apache/incubator-pinot/pull/3584
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pinot-core/src/main/java/com/linkedin/pinot/core/realtime/impl/kafka/KafkaConnectionHandler.java
b/pinot-core/src/main/java/com/linkedin/pinot/core/realtime/impl/kafka/KafkaConnectionHandler.java
index b47caf9cb4..d7f6d1ff2c 100644
---
a/pinot-core/src/main/java/com/linkedin/pinot/core/realtime/impl/kafka/KafkaConnectionHandler.java
+++
b/pinot-core/src/main/java/com/linkedin/pinot/core/realtime/impl/kafka/KafkaConnectionHandler.java
@@ -15,6 +15,7 @@
*/
package com.linkedin.pinot.core.realtime.impl.kafka;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Uninterruptibles;
@@ -42,8 +43,6 @@
*/
public class KafkaConnectionHandler {
private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaConnectionHandler.class);
- private static final int SOCKET_TIMEOUT_MILLIS = 10000;
- private static final int SOCKET_BUFFER_SIZE = 512000;
enum ConsumerState {
CONNECTING_TO_BOOTSTRAP_NODE,
@@ -65,6 +64,8 @@
KafkaBrokerWrapper _leader;
String _currentHost;
int _currentPort;
+ int _bufferSize;
+ int _socketTimeout;
final KafkaSimpleConsumerFactory _simpleConsumerFactory;
SimpleConsumer _simpleConsumer;
@@ -73,6 +74,11 @@
boolean isPartitionProvided;
+ @VisibleForTesting
+ public SimpleConsumer getSimpleConsumer() {
+ return _simpleConsumer;
+ }
+
/**
* A Kafka protocol error that indicates a situation that is not likely to
clear up by retrying the request (for
* example, no such topic or offset out of range).
@@ -110,6 +116,8 @@ public KafkaConnectionHandler(String clientId, StreamConfig
streamConfig,
isPartitionProvided = false;
_partition = Integer.MIN_VALUE;
+ _bufferSize = kafkaLowLevelStreamConfig.getKafkaBufferSize();
+ _socketTimeout = kafkaLowLevelStreamConfig.getKafkaSocketTimeout();
initializeBootstrapNodeList(kafkaLowLevelStreamConfig.getBootstrapHosts());
setCurrentState(new ConnectingToBootstrapNode());
}
@@ -133,6 +141,8 @@ public KafkaConnectionHandler(String clientId, StreamConfig
streamConfig, int pa
isPartitionProvided = true;
_partition = partition;
+ _bufferSize = kafkaLowLevelStreamConfig.getKafkaBufferSize();
+ _socketTimeout = kafkaLowLevelStreamConfig.getKafkaSocketTimeout();
initializeBootstrapNodeList(kafkaLowLevelStreamConfig.getBootstrapHosts());
setCurrentState(new ConnectingToBootstrapNode());
}
@@ -216,8 +226,8 @@ public void process() {
try {
LOGGER.info("Connecting to bootstrap host {}:{} for topic {}",
_currentHost, _currentPort, _topic);
- _simpleConsumer =
_simpleConsumerFactory.buildSimpleConsumer(_currentHost, _currentPort,
SOCKET_TIMEOUT_MILLIS,
- SOCKET_BUFFER_SIZE, _clientId);
+ _simpleConsumer =
_simpleConsumerFactory.buildSimpleConsumer(_currentHost, _currentPort,
_socketTimeout,
+ _bufferSize, _clientId);
setCurrentState(new ConnectedToBootstrapNode());
} catch (Exception e) {
handleConsumerException(e);
@@ -326,8 +336,8 @@ void process() {
// Connect to the partition leader
try {
_simpleConsumer =
- _simpleConsumerFactory.buildSimpleConsumer(_leader.host(),
_leader.port(), SOCKET_TIMEOUT_MILLIS,
- SOCKET_BUFFER_SIZE, _clientId);
+ _simpleConsumerFactory.buildSimpleConsumer(_leader.host(),
_leader.port(), _socketTimeout,
+ _bufferSize, _clientId);
setCurrentState(new ConnectedToPartitionLeader());
} catch (Exception e) {
diff --git
a/pinot-core/src/main/java/com/linkedin/pinot/core/realtime/impl/kafka/KafkaLowLevelStreamConfig.java
b/pinot-core/src/main/java/com/linkedin/pinot/core/realtime/impl/kafka/KafkaLowLevelStreamConfig.java
index de63f5a8ca..1b3a6c8542 100644
---
a/pinot-core/src/main/java/com/linkedin/pinot/core/realtime/impl/kafka/KafkaLowLevelStreamConfig.java
+++
b/pinot-core/src/main/java/com/linkedin/pinot/core/realtime/impl/kafka/KafkaLowLevelStreamConfig.java
@@ -18,6 +18,8 @@
import com.google.common.base.Preconditions;
import com.linkedin.pinot.common.utils.EqualityUtils;
import com.linkedin.pinot.core.realtime.stream.StreamConfig;
+import org.apache.commons.lang.StringUtils;
+
import java.util.Map;
@@ -28,6 +30,8 @@
private String _kafkaTopicName;
private String _bootstrapHosts;
+ private int _kafkaBufferSize;
+ private int _kafkaSocketTimeout;
/**
* Builds a wrapper around {@link StreamConfig} to fetch kafka partition
level consumer related configs
@@ -40,7 +44,15 @@ public KafkaLowLevelStreamConfig(StreamConfig streamConfig) {
String llcBrokerListKey =
KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BROKER_LIST);
+ String llcBufferKey =
+
KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE);
+ String llcTimeoutKey =
+
KafkaStreamConfigProperties.constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT);
_bootstrapHosts = streamConfigMap.get(llcBrokerListKey);
+ _kafkaBufferSize = getIntConfigWithDefault(streamConfigMap, llcBufferKey,
+
KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT);
+ _kafkaSocketTimeout = getIntConfigWithDefault(streamConfigMap,
llcTimeoutKey,
+
KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT);
Preconditions.checkNotNull(_bootstrapHosts,
"Must specify kafka brokers list " + llcBrokerListKey + " in case of
low level kafka consumer");
}
@@ -53,12 +65,36 @@ public String getBootstrapHosts() {
return _bootstrapHosts;
}
+ public int getKafkaBufferSize() {
+ return _kafkaBufferSize;
+ }
+
+ public int getKafkaSocketTimeout() {
+ return _kafkaSocketTimeout;
+ }
+
+ private int getIntConfigWithDefault(Map<String, String> configMap, String
key, int defaultValue) {
+ String stringValue = configMap.get(key);
+ try {
+ if (StringUtils.isNotEmpty(stringValue)) {
+ return Integer.parseInt(stringValue);
+ }
+ return defaultValue;
+ } catch (NumberFormatException ex) {
+ return defaultValue;
+ }
+ }
+
@Override
public String toString() {
- return "KafkaLowLevelStreamConfig{" + "_kafkaTopicName='" +
_kafkaTopicName + '\'' + ", _bootstrapHosts='"
- + _bootstrapHosts + '\'' + '}';
+ return "KafkaLowLevelStreamConfig{" + "_kafkaTopicName='" +
_kafkaTopicName + '\''
+ + ", _bootstrapHosts='" + _bootstrapHosts + '\''
+ + ", _kafkaBufferSize='" + _kafkaBufferSize + '\''
+ + ", _kafkaSocketTimeout='" + _kafkaSocketTimeout + '\''
+ + '}';
}
+
@Override
public boolean equals(Object o) {
if (EqualityUtils.isSameReference(this, o)) {
@@ -71,14 +107,18 @@ public boolean equals(Object o) {
KafkaLowLevelStreamConfig that = (KafkaLowLevelStreamConfig) o;
- return EqualityUtils.isEqual(_kafkaTopicName, that._kafkaTopicName) &&
EqualityUtils.isEqual(_bootstrapHosts,
- that._bootstrapHosts);
+ return EqualityUtils.isEqual(_kafkaTopicName, that._kafkaTopicName)
+ && EqualityUtils.isEqual(_bootstrapHosts, that._bootstrapHosts)
+ && EqualityUtils.isEqual(_kafkaBufferSize, that._kafkaBufferSize)
+ && EqualityUtils.isEqual(_kafkaSocketTimeout,
that._kafkaSocketTimeout);
}
@Override
public int hashCode() {
int result = EqualityUtils.hashCodeOf(_kafkaTopicName);
result = EqualityUtils.hashCodeOf(result, _bootstrapHosts);
+ result = EqualityUtils.hashCodeOf(result, _kafkaBufferSize);
+ result = EqualityUtils.hashCodeOf(result, _kafkaSocketTimeout);
return result;
}
}
diff --git
a/pinot-core/src/main/java/com/linkedin/pinot/core/realtime/impl/kafka/KafkaStreamConfigProperties.java
b/pinot-core/src/main/java/com/linkedin/pinot/core/realtime/impl/kafka/KafkaStreamConfigProperties.java
index cc5bcf010d..be62ec583e 100644
---
a/pinot-core/src/main/java/com/linkedin/pinot/core/realtime/impl/kafka/KafkaStreamConfigProperties.java
+++
b/pinot-core/src/main/java/com/linkedin/pinot/core/realtime/impl/kafka/KafkaStreamConfigProperties.java
@@ -39,6 +39,10 @@
public static class LowLevelConsumer {
public static final String KAFKA_BROKER_LIST = "kafka.broker.list";
+ public static final String KAFKA_BUFFER_SIZE = "kafka.buffer.size";
+ public static final int KAFKA_BUFFER_SIZE_DEFAULT = 512000;
+ public static final String KAFKA_SOCKET_TIMEOUT = "kafka.socket.timeout";
+ public static final int KAFKA_SOCKET_TIMEOUT_DEFAULT = 10000;
}
public static final String KAFKA_CONSUMER_PROP_PREFIX =
"kafka.consumer.prop";
diff --git
a/pinot-core/src/test/java/com/linkedin/pinot/core/realtime/impl/kafka/KafkaLowLevelStreamConfigTest.java
b/pinot-core/src/test/java/com/linkedin/pinot/core/realtime/impl/kafka/KafkaLowLevelStreamConfigTest.java
new file mode 100644
index 0000000000..289e14086d
--- /dev/null
+++
b/pinot-core/src/test/java/com/linkedin/pinot/core/realtime/impl/kafka/KafkaLowLevelStreamConfigTest.java
@@ -0,0 +1,100 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. ([email protected])
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.core.realtime.impl.kafka;
+
+import com.linkedin.pinot.core.realtime.stream.StreamConfig;
+import com.linkedin.pinot.core.realtime.stream.StreamConfigProperties;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class KafkaLowLevelStreamConfigTest {
+
+ private KafkaLowLevelStreamConfig getStreamConfig(String topic, String
bootstrapHosts, String buffer, String socketTimeout) {
+ Map<String, String> streamConfigMap = new HashMap<>();
+ String streamType = "kafka";
+ String consumerType = StreamConfig.ConsumerType.LOWLEVEL.toString();
+ String consumerFactoryClassName = KafkaConsumerFactory.class.getName();
+ String decoderClass = KafkaAvroMessageDecoder.class.getName();
+ streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, streamType);
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(streamType,
StreamConfigProperties.STREAM_TOPIC_NAME), topic);
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(streamType,
StreamConfigProperties.STREAM_CONSUMER_TYPES),
+ consumerType);
+
streamConfigMap.put(StreamConfigProperties.constructStreamProperty(streamType,
+ StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS),
consumerFactoryClassName);
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(streamType,
StreamConfigProperties.STREAM_DECODER_CLASS),
+ decoderClass);
+ streamConfigMap.put("stream.kafka.broker.list", bootstrapHosts);
+ if (buffer != null) {
+ streamConfigMap.put("stream.kafka.buffer.size", buffer);
+ }
+ if (socketTimeout != null) {
+ streamConfigMap.put("stream.kafka.socket.timeout",
String.valueOf(socketTimeout));
+ }
+ return new KafkaLowLevelStreamConfig(new StreamConfig(streamConfigMap));
+ }
+
+ @Test
+ public void testGetKafkaTopicName() {
+ KafkaLowLevelStreamConfig config = getStreamConfig("topic", "", "", "");
+ Assert.assertEquals("topic", config.getKafkaTopicName());
+ }
+
+ @Test
+ public void testGetBootstrapHosts() {
+ KafkaLowLevelStreamConfig config = getStreamConfig("topic", "host1", "",
"");
+ Assert.assertEquals("host1", config.getBootstrapHosts());
+ }
+
+ @Test
+ public void testGetKafkaBufferSize() {
+ // test default
+ KafkaLowLevelStreamConfig config = getStreamConfig("topic", "host1", null,
"");
+
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT,
config.getKafkaBufferSize());
+
+ config = getStreamConfig("topic", "host1", "", "");
+
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT,
config.getKafkaBufferSize());
+
+ config = getStreamConfig("topic", "host1", "bad value", "");
+
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT,
config.getKafkaBufferSize());
+
+ // correct config
+ config = getStreamConfig("topic", "host1", "100", "");
+ Assert.assertEquals(100, config.getKafkaBufferSize());
+ }
+
+ @Test
+ public void testGetKafkaSocketTimeout() {
+ // test default
+ KafkaLowLevelStreamConfig config = getStreamConfig("topic", "host1",
"",null);
+
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT,
config.getKafkaSocketTimeout());
+
+ config = getStreamConfig("topic", "host1", "","");
+
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT,
config.getKafkaSocketTimeout());
+
+ config = getStreamConfig("topic", "host1", "","bad value");
+
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT,
config.getKafkaSocketTimeout());
+
+ // correct config
+ config = getStreamConfig("topic", "host1", "", "100");
+ Assert.assertEquals(100, config.getKafkaSocketTimeout());
+ }
+}
\ No newline at end of file
diff --git
a/pinot-core/src/test/java/com/linkedin/pinot/core/realtime/kafka/KafkaPartitionLevelConsumerTest.java
b/pinot-core/src/test/java/com/linkedin/pinot/core/realtime/kafka/KafkaPartitionLevelConsumerTest.java
index 73adca51cc..43160081d8 100644
---
a/pinot-core/src/test/java/com/linkedin/pinot/core/realtime/kafka/KafkaPartitionLevelConsumerTest.java
+++
b/pinot-core/src/test/java/com/linkedin/pinot/core/realtime/kafka/KafkaPartitionLevelConsumerTest.java
@@ -18,6 +18,7 @@
import com.google.common.base.Preconditions;
import com.linkedin.pinot.core.realtime.impl.kafka.KafkaPartitionLevelConsumer;
import com.linkedin.pinot.core.realtime.impl.kafka.KafkaSimpleConsumerFactory;
+import com.linkedin.pinot.core.realtime.impl.kafka.KafkaStreamConfigProperties;
import com.linkedin.pinot.core.realtime.impl.kafka.KafkaStreamMetadataProvider;
import com.linkedin.pinot.core.realtime.stream.OffsetCriteria;
import com.linkedin.pinot.core.realtime.stream.StreamConfig;
@@ -208,6 +209,55 @@ public SimpleConsumer buildSimpleConsumer(String host, int
port, int soTimeout,
}
}
+ @Test
+ public void testBuildConsumer() throws Exception {
+ String streamType = "kafka";
+ String streamKafkaTopicName = "theTopic";
+ String streamKafkaBrokerList = "abcd:1234,bcde:2345";
+ String streamKafkaConsumerType = "simple";
+ String clientId = "clientId";
+
+ MockKafkaSimpleConsumerFactory mockKafkaSimpleConsumerFactory = new
MockKafkaSimpleConsumerFactory(
+ new String[] { "abcd", "bcde" },
+ new int[] { 1234, 2345 },
+ new long[] { 12345L, 23456L },
+ new long[] { 23456L, 34567L },
+ new int[] { 0, 1 },
+ streamKafkaTopicName
+ );
+
+ Map<String, String> streamConfigMap = new HashMap<>();
+ streamConfigMap.put("streamType", streamType);
+ streamConfigMap.put("stream.kafka.topic.name", streamKafkaTopicName);
+ streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
+ streamConfigMap.put("stream.kafka.consumer.type", streamKafkaConsumerType);
+ streamConfigMap.put("stream.kafka.consumer.factory.class.name",
mockKafkaSimpleConsumerFactory.getClass().getName());
+ streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+ StreamConfig streamConfig = new StreamConfig(streamConfigMap);
+
+ KafkaStreamMetadataProvider streamMetadataProvider =
+ new KafkaStreamMetadataProvider(clientId, streamConfig,
mockKafkaSimpleConsumerFactory);
+
+ // test default value
+ KafkaPartitionLevelConsumer kafkaSimpleStreamConsumer =
+ new KafkaPartitionLevelConsumer(clientId, streamConfig, 0,
mockKafkaSimpleConsumerFactory);
+ kafkaSimpleStreamConsumer.fetchMessages(12345L, 23456L, 10000);
+
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT,
+ kafkaSimpleStreamConsumer.getSimpleConsumer().bufferSize());
+
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT,
+ kafkaSimpleStreamConsumer.getSimpleConsumer().soTimeout());
+
+ // test user defined values
+ streamConfigMap.put("stream.kafka.buffer.size", "100");
+ streamConfigMap.put("stream.kafka.socket.timeout", "1000");
+ streamConfig = new StreamConfig(streamConfigMap);
+ kafkaSimpleStreamConsumer =
+ new KafkaPartitionLevelConsumer(clientId, streamConfig, 0,
mockKafkaSimpleConsumerFactory);
+ kafkaSimpleStreamConsumer.fetchMessages(12345L, 23456L, 10000);
+ Assert.assertEquals(100,
kafkaSimpleStreamConsumer.getSimpleConsumer().bufferSize());
+ Assert.assertEquals(1000,
kafkaSimpleStreamConsumer.getSimpleConsumer().soTimeout());
+ }
+
@Test
public void testGetPartitionCount() {
String streamType = "kafka";
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]