[ 
https://issues.apache.org/jira/browse/GOBBLIN-1956?focusedWorklogId=890802&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-890802
 ]

ASF GitHub Bot logged work on GOBBLIN-1956:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 15/Nov/23 21:43
            Start Date: 15/Nov/23 21:43
    Worklog Time Spent: 10m 
      Work Description: ZihanLi58 commented on code in PR #3827:
URL: https://github.com/apache/gobblin/pull/3827#discussion_r1394841114


##########
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java:
##########
@@ -214,6 +220,19 @@ public LongWatermark getLwm() {
 
   public KafkaStreamingExtractor(WorkUnitState state) {
     super(state);
+    this.topicPartitions = getTopicPartitionsFromWorkUnit(state);
+    Map<KafkaPartition, LongWatermark> topicPartitionWatermarks = 
getTopicPartitionWatermarks(this.topicPartitions);
+    if (this.maxAvgRecordSize > 0 ) {
+      long maxPollRecords =
+          state.getPropAsLong(MAX_KAFKA_BUFFER_SIZE_IN_BYTES, 
DEFAULT_MAX_KAFKA_BUFFER_SIZE_IN_BYTES) / maxAvgRecordSize;

Review Comment:
   there is no relationship between those two config.  The value should not be 
super small as we need to guarantee the performance, and not super large as we 
want to avoid OOM issue. So I gave 50MB here as default. And this should only 
take effect when record size are large, for normal topic, it should fall back 
to use KAFKA_MAX_POLL_RECORDS_KEY





Issue Time Tracking
-------------------

    Worklog Id:     (was: 890802)
    Time Spent: 50m  (was: 40m)

> Make Kafka streaming pipeline be able to config the max poll records during 
> runtime
> -----------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-1956
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1956
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: Zihan Li
>            Priority: Major
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> Kafka max.poll.records affect the performance of the Kafka consumer.
> Config it to be a lower number means it take more time to process each 
> record. While config it to be a higher number means we need more memory to 
> hold the records. So we should use the avg record size to determine the value 
> for different topics. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to