This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 565171f  add config to control kafka fetcher size and increase default 
(#3869)
565171f is described below

commit 565171f23cc4af4e5a2c68b0d64194561a34ea9a
Author: James Shao <[email protected]>
AuthorDate: Mon Mar 18 17:08:32 2019 -0700

    add config to control kafka fetcher size and increase default (#3869)
    
    * add config to control kafka fetcher size and increase default
    
    * update per cr feedback
    
    * change default back to old value
---
 .../impl/kafka/KafkaConnectionHandler.java         | 26 +++++++----
 .../impl/kafka/KafkaLowLevelStreamConfig.java      | 34 +++++++++++---
 .../impl/kafka/KafkaPartitionLevelConsumer.java    | 21 ++++++++-
 .../impl/kafka/KafkaStreamConfigProperties.java    |  3 ++
 .../impl/kafka/KafkaLowLevelStreamConfigTest.java  | 53 +++++++++++++++++++++-
 .../kafka/KafkaPartitionLevelConsumerTest.java     |  7 +++
 6 files changed, 124 insertions(+), 20 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaConnectionHandler.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaConnectionHandler.java
index 23c5528..a3366a3 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaConnectionHandler.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaConnectionHandler.java
@@ -77,6 +77,8 @@ public class KafkaConnectionHandler {
 
   boolean isPartitionProvided;
 
+  private final KafkaLowLevelStreamConfig _kafkaLowLevelStreamConfig;
+
   @VisibleForTesting
   public SimpleConsumer getSimpleConsumer() {
     return _simpleConsumer;
@@ -109,19 +111,19 @@ public class KafkaConnectionHandler {
    */
   public KafkaConnectionHandler(String clientId, StreamConfig streamConfig,
       KafkaSimpleConsumerFactory simpleConsumerFactory) {
-    KafkaLowLevelStreamConfig kafkaLowLevelStreamConfig = new 
KafkaLowLevelStreamConfig(streamConfig);
+    _kafkaLowLevelStreamConfig = new KafkaLowLevelStreamConfig(streamConfig);
     _simpleConsumerFactory = simpleConsumerFactory;
     _clientId = clientId;
-    _topic = kafkaLowLevelStreamConfig.getKafkaTopicName();
+    _topic = _kafkaLowLevelStreamConfig.getKafkaTopicName();
     _connectTimeoutMillis = streamConfig.getConnectionTimeoutMillis();
     _simpleConsumer = null;
 
     isPartitionProvided = false;
     _partition = Integer.MIN_VALUE;
 
-    _bufferSize = kafkaLowLevelStreamConfig.getKafkaBufferSize();
-    _socketTimeout = kafkaLowLevelStreamConfig.getKafkaSocketTimeout();
-    initializeBootstrapNodeList(kafkaLowLevelStreamConfig.getBootstrapHosts());
+    _bufferSize = _kafkaLowLevelStreamConfig.getKafkaBufferSize();
+    _socketTimeout = _kafkaLowLevelStreamConfig.getKafkaSocketTimeout();
+    
initializeBootstrapNodeList(_kafkaLowLevelStreamConfig.getBootstrapHosts());
     setCurrentState(new ConnectingToBootstrapNode());
   }
 
@@ -134,19 +136,19 @@ public class KafkaConnectionHandler {
   public KafkaConnectionHandler(String clientId, StreamConfig streamConfig, 
int partition,
       KafkaSimpleConsumerFactory simpleConsumerFactory) {
 
-    KafkaLowLevelStreamConfig kafkaLowLevelStreamConfig = new 
KafkaLowLevelStreamConfig(streamConfig);
+    _kafkaLowLevelStreamConfig = new KafkaLowLevelStreamConfig(streamConfig);
     _simpleConsumerFactory = simpleConsumerFactory;
     _clientId = clientId;
-    _topic = kafkaLowLevelStreamConfig.getKafkaTopicName();
+    _topic = _kafkaLowLevelStreamConfig.getKafkaTopicName();
     _connectTimeoutMillis = streamConfig.getConnectionTimeoutMillis();
     _simpleConsumer = null;
 
     isPartitionProvided = true;
     _partition = partition;
 
-    _bufferSize = kafkaLowLevelStreamConfig.getKafkaBufferSize();
-    _socketTimeout = kafkaLowLevelStreamConfig.getKafkaSocketTimeout();
-    initializeBootstrapNodeList(kafkaLowLevelStreamConfig.getBootstrapHosts());
+    _bufferSize = _kafkaLowLevelStreamConfig.getKafkaBufferSize();
+    _socketTimeout = _kafkaLowLevelStreamConfig.getKafkaSocketTimeout();
+    
initializeBootstrapNodeList(_kafkaLowLevelStreamConfig.getBootstrapHosts());
     setCurrentState(new ConnectingToBootstrapNode());
   }
 
@@ -180,6 +182,10 @@ public class KafkaConnectionHandler {
     }
   }
 
+  protected KafkaLowLevelStreamConfig getkafkaLowLevelStreamConfig() {
+    return _kafkaLowLevelStreamConfig;
+  }
+
   abstract class State {
     private ConsumerState stateValue;
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaLowLevelStreamConfig.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaLowLevelStreamConfig.java
index 76a5287..00ed1fd 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaLowLevelStreamConfig.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaLowLevelStreamConfig.java
@@ -30,10 +30,12 @@ import org.apache.pinot.core.realtime.stream.StreamConfig;
  */
 public class KafkaLowLevelStreamConfig {
 
-  private String _kafkaTopicName;
-  private String _bootstrapHosts;
-  private int _kafkaBufferSize;
-  private int _kafkaSocketTimeout;
+  private final String _kafkaTopicName;
+  private final String _bootstrapHosts;
+  private final int _kafkaBufferSize;
+  private final int _kafkaSocketTimeout;
+  private final int _kafkaFetcherSizeBytes;
+  private final int _kafkaFetcherMinBytes;
 
   /**
    * Builds a wrapper around {@link StreamConfig} to fetch kafka partition 
level consumer related configs
@@ -50,11 +52,18 @@ public class KafkaLowLevelStreamConfig {
         
.constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE);
     String llcTimeoutKey = KafkaStreamConfigProperties
         
.constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT);
+    String fetcherSizeKey = KafkaStreamConfigProperties
+        
.constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_SIZE_BYTES);
+    String fetcherMinBytesKey = KafkaStreamConfigProperties
+        
.constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES);
     _bootstrapHosts = streamConfigMap.get(llcBrokerListKey);
     _kafkaBufferSize = getIntConfigWithDefault(streamConfigMap, llcBufferKey,
         
KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT);
     _kafkaSocketTimeout = getIntConfigWithDefault(streamConfigMap, 
llcTimeoutKey,
         
KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT);
+    _kafkaFetcherSizeBytes = getIntConfigWithDefault(streamConfigMap, 
fetcherSizeKey, _kafkaBufferSize);
+    _kafkaFetcherMinBytes = getIntConfigWithDefault(streamConfigMap, 
fetcherMinBytesKey,
+        
KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES_DEFAULT);
     Preconditions.checkNotNull(_bootstrapHosts,
         "Must specify kafka brokers list " + llcBrokerListKey + " in case of 
low level kafka consumer");
   }
@@ -75,6 +84,14 @@ public class KafkaLowLevelStreamConfig {
     return _kafkaSocketTimeout;
   }
 
+  public int getKafkaFetcherSizeBytes() {
+    return _kafkaFetcherSizeBytes;
+  }
+
+  public int getKafkaFetcherMinBytes() {
+    return _kafkaFetcherMinBytes;
+  }
+
   private int getIntConfigWithDefault(Map<String, String> configMap, String 
key, int defaultValue) {
     String stringValue = configMap.get(key);
     try {
@@ -91,7 +108,8 @@ public class KafkaLowLevelStreamConfig {
   public String toString() {
     return "KafkaLowLevelStreamConfig{" + "_kafkaTopicName='" + 
_kafkaTopicName + '\'' + ", _bootstrapHosts='"
         + _bootstrapHosts + '\'' + ", _kafkaBufferSize='" + _kafkaBufferSize + 
'\'' + ", _kafkaSocketTimeout='"
-        + _kafkaSocketTimeout + '\'' + '}';
+        + _kafkaSocketTimeout + '\'' + ", _kafkaFetcherSizeBytes='" + 
_kafkaFetcherSizeBytes + '\'' + ", _kafkaFetcherMinBytes='"
+        + _kafkaFetcherMinBytes + '\'' + '}';
   }
 
   @Override
@@ -109,7 +127,9 @@ public class KafkaLowLevelStreamConfig {
     return EqualityUtils.isEqual(_kafkaTopicName, that._kafkaTopicName) && 
EqualityUtils
         .isEqual(_bootstrapHosts, that._bootstrapHosts) && EqualityUtils
         .isEqual(_kafkaBufferSize, that._kafkaBufferSize) && EqualityUtils
-        .isEqual(_kafkaSocketTimeout, that._kafkaSocketTimeout);
+        .isEqual(_kafkaSocketTimeout, that._kafkaSocketTimeout) && 
EqualityUtils
+        .isEqual(_kafkaFetcherSizeBytes, that._kafkaFetcherSizeBytes) && 
EqualityUtils
+        .isEqual(_kafkaFetcherMinBytes, that._kafkaFetcherMinBytes);
   }
 
   @Override
@@ -118,6 +138,8 @@ public class KafkaLowLevelStreamConfig {
     result = EqualityUtils.hashCodeOf(result, _bootstrapHosts);
     result = EqualityUtils.hashCodeOf(result, _kafkaBufferSize);
     result = EqualityUtils.hashCodeOf(result, _kafkaSocketTimeout);
+    result = EqualityUtils.hashCodeOf(result, _kafkaFetcherSizeBytes);
+    result = EqualityUtils.hashCodeOf(result, _kafkaFetcherMinBytes);
     return result;
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaPartitionLevelConsumer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaPartitionLevelConsumer.java
index b40140c..003153c 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaPartitionLevelConsumer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaPartitionLevelConsumer.java
@@ -38,14 +38,21 @@ import org.slf4j.LoggerFactory;
 public class KafkaPartitionLevelConsumer extends KafkaConnectionHandler 
implements PartitionLevelConsumer {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaPartitionLevelConsumer.class);
 
+  private final int _fetchRequestMinBytes;
+  private final int _fetchRequestSizeBytes;
+
   public KafkaPartitionLevelConsumer(String clientId, StreamConfig 
streamConfig, int partition) {
     super(clientId, streamConfig, partition, new 
KafkaSimpleConsumerFactoryImpl());
+    _fetchRequestSizeBytes = 
getkafkaLowLevelStreamConfig().getKafkaFetcherSizeBytes();
+    _fetchRequestMinBytes = 
getkafkaLowLevelStreamConfig().getKafkaFetcherMinBytes();
   }
 
   @VisibleForTesting
   public KafkaPartitionLevelConsumer(String clientId, StreamConfig 
streamConfig, int partition,
       KafkaSimpleConsumerFactory kafkaSimpleConsumerFactory) {
     super(clientId, streamConfig, partition, kafkaSimpleConsumerFactory);
+    _fetchRequestSizeBytes = 
getkafkaLowLevelStreamConfig().getKafkaFetcherSizeBytes();
+    _fetchRequestMinBytes = 
getkafkaLowLevelStreamConfig().getKafkaFetcherMinBytes();
   }
 
   /**
@@ -75,8 +82,8 @@ public class KafkaPartitionLevelConsumer extends 
KafkaConnectionHandler implemen
     }
 
     FetchResponse fetchResponse = _simpleConsumer.fetch(
-        new FetchRequestBuilder().minBytes(100000).maxWait(timeoutMillis)
-            .addFetch(_topic, _partition, startOffset, 500000).build());
+        new 
FetchRequestBuilder().minBytes(_fetchRequestMinBytes).maxWait(timeoutMillis)
+            .addFetch(_topic, _partition, startOffset, 
_fetchRequestSizeBytes).build());
 
     if (!fetchResponse.hasError()) {
       final Iterable<MessageAndOffset> messageAndOffsetIterable =
@@ -108,6 +115,16 @@ public class KafkaPartitionLevelConsumer extends 
KafkaConnectionHandler implemen
     });
   }
 
+  @VisibleForTesting
+  public int getFetchRequestSize() {
+    return _fetchRequestSizeBytes;
+  }
+
+  @VisibleForTesting
+  public int getFetchRequestMinBytes() {
+    return _fetchRequestMinBytes;
+  }
+
   @Override
   /**
    * Closes this consumer.
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStreamConfigProperties.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStreamConfigProperties.java
index e30bfd3..c46f44a 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStreamConfigProperties.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStreamConfigProperties.java
@@ -46,6 +46,9 @@ public class KafkaStreamConfigProperties {
     public static final int KAFKA_BUFFER_SIZE_DEFAULT = 512000;
     public static final String KAFKA_SOCKET_TIMEOUT = "kafka.socket.timeout";
     public static final int KAFKA_SOCKET_TIMEOUT_DEFAULT = 10000;
+    public static final String KAFKA_FETCHER_SIZE_BYTES = "kafka.fetcher.size";
+    public static final String KAFKA_FETCHER_MIN_BYTES = 
"kafka.fetcher.minBytes";
+    public static final int KAFKA_FETCHER_MIN_BYTES_DEFAULT = 100000;
   }
 
   public static final String KAFKA_CONSUMER_PROP_PREFIX = 
"kafka.consumer.prop";
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/kafka/KafkaLowLevelStreamConfigTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/kafka/KafkaLowLevelStreamConfigTest.java
index a0a069f..bdb48cb 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/kafka/KafkaLowLevelStreamConfigTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/kafka/KafkaLowLevelStreamConfigTest.java
@@ -29,7 +29,12 @@ import org.testng.annotations.Test;
 public class KafkaLowLevelStreamConfigTest {
 
   private KafkaLowLevelStreamConfig getStreamConfig(String topic, String 
bootstrapHosts, String buffer,
-      String socketTimeout) {
+                                                    String socketTimeout) {
+    return getStreamConfig(topic, bootstrapHosts, buffer, socketTimeout, null, 
null);
+  }
+
+  private KafkaLowLevelStreamConfig getStreamConfig(String topic, String 
bootstrapHosts, String buffer,
+      String socketTimeout, String fetcherSize, String fetcherMinBytes) {
     Map<String, String> streamConfigMap = new HashMap<>();
     String streamType = "kafka";
     String consumerType = StreamConfig.ConsumerType.LOWLEVEL.toString();
@@ -53,7 +58,13 @@ public class KafkaLowLevelStreamConfigTest {
       streamConfigMap.put("stream.kafka.buffer.size", buffer);
     }
     if (socketTimeout != null) {
-      streamConfigMap.put("stream.kafka.socket.timeout", 
String.valueOf(socketTimeout));
+      streamConfigMap.put("stream.kafka.socket.timeout", socketTimeout);
+    }
+    if (fetcherSize != null) {
+      streamConfigMap.put("stream.kafka.fetcher.size", fetcherSize);
+    }
+    if (fetcherMinBytes != null) {
+      streamConfigMap.put("stream.kafka.fetcher.minBytes", fetcherMinBytes);
     }
     return new KafkaLowLevelStreamConfig(new StreamConfig(streamConfigMap));
   }
@@ -109,4 +120,42 @@ public class KafkaLowLevelStreamConfigTest {
     config = getStreamConfig("topic", "host1", "", "100");
     Assert.assertEquals(100, config.getKafkaSocketTimeout());
   }
+
+  @Test
+  public void testGetFetcherSize() {
+    // test default
+    KafkaLowLevelStreamConfig config = getStreamConfig("topic", "host1", "", 
"", "",null);
+    
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT,
+        config.getKafkaFetcherSizeBytes());
+
+    config = getStreamConfig("topic", "host1", "100", "", "", null);
+    Assert.assertEquals(100, config.getKafkaFetcherSizeBytes());
+
+    config = getStreamConfig("topic", "host1", "100", "", "bad value", null);
+    Assert.assertEquals(100, config.getKafkaFetcherSizeBytes());
+
+    // correct config
+    config = getStreamConfig("topic", "host1", "100", "", "200", null);
+    Assert.assertEquals(200, config.getKafkaFetcherSizeBytes());
+  }
+
+  @Test
+  public void testGetFetcherMinBytes() {
+    // test default
+    KafkaLowLevelStreamConfig config = getStreamConfig("topic", "host1", "", 
"", "", null);
+    
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES_DEFAULT,
+        config.getKafkaFetcherMinBytes());
+
+    config = getStreamConfig("topic", "host1", "", "", "", "");
+    
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES_DEFAULT,
+        config.getKafkaFetcherMinBytes());
+
+    config = getStreamConfig("topic", "host1", "", "", "", "bad value");
+    
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES_DEFAULT,
+        config.getKafkaFetcherMinBytes());
+
+    // correct config
+    config = getStreamConfig("topic", "host1", "", "", "", "100");
+    Assert.assertEquals(100, config.getKafkaFetcherMinBytes());
+  }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/kafka/KafkaPartitionLevelConsumerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/kafka/KafkaPartitionLevelConsumerTest.java
index 173519a..165dcb4 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/kafka/KafkaPartitionLevelConsumerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/kafka/KafkaPartitionLevelConsumerTest.java
@@ -235,6 +235,8 @@ public class KafkaPartitionLevelConsumerTest {
     streamConfigMap
         .put("stream.kafka.consumer.factory.class.name", 
mockKafkaSimpleConsumerFactory.getClass().getName());
     streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+    streamConfigMap.put("stream.kafka.fetcher.size", "10000");
+    streamConfigMap.put("stream.kafka.fetcher.minBytes", "20000");
     StreamConfig streamConfig = new StreamConfig(streamConfigMap);
 
     KafkaStreamMetadataProvider streamMetadataProvider =
@@ -244,11 +246,16 @@ public class KafkaPartitionLevelConsumerTest {
     KafkaPartitionLevelConsumer kafkaSimpleStreamConsumer =
         new KafkaPartitionLevelConsumer(clientId, streamConfig, 0, 
mockKafkaSimpleConsumerFactory);
     kafkaSimpleStreamConsumer.fetchMessages(12345L, 23456L, 10000);
+
     
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT,
         kafkaSimpleStreamConsumer.getSimpleConsumer().bufferSize());
     
Assert.assertEquals(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT,
         kafkaSimpleStreamConsumer.getSimpleConsumer().soTimeout());
 
+    // test parsing values
+    Assert.assertEquals(10000, 
kafkaSimpleStreamConsumer.getFetchRequestSize());
+    Assert.assertEquals(20000, 
kafkaSimpleStreamConsumer.getFetchRequestMinBytes());
+
     // test user defined values
     streamConfigMap.put("stream.kafka.buffer.size", "100");
     streamConfigMap.put("stream.kafka.socket.timeout", "1000");


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to