[
https://issues.apache.org/jira/browse/GOBBLIN-1956?focusedWorklogId=890802&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-890802
]
ASF GitHub Bot logged work on GOBBLIN-1956:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 15/Nov/23 21:43
Start Date: 15/Nov/23 21:43
Worklog Time Spent: 10m
Work Description: ZihanLi58 commented on code in PR #3827:
URL: https://github.com/apache/gobblin/pull/3827#discussion_r1394841114
##########
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java:
##########
@@ -214,6 +220,19 @@ public LongWatermark getLwm() {
public KafkaStreamingExtractor(WorkUnitState state) {
super(state);
+ this.topicPartitions = getTopicPartitionsFromWorkUnit(state);
+ Map<KafkaPartition, LongWatermark> topicPartitionWatermarks =
getTopicPartitionWatermarks(this.topicPartitions);
+ if (this.maxAvgRecordSize > 0 ) {
+ long maxPollRecords =
+ state.getPropAsLong(MAX_KAFKA_BUFFER_SIZE_IN_BYTES,
DEFAULT_MAX_KAFKA_BUFFER_SIZE_IN_BYTES) / maxAvgRecordSize;
Review Comment:
there is no relationship between those two config. The value should not be
super small as we need to guarantee the performance, and not super large as we
want to avoid OOM issue. So I gave 50MB here as default. And this should only
take effect when record size are large, for normal topic, it should fall back
to use KAFKA_MAX_POLL_RECORDS_KEY
Issue Time Tracking
-------------------
Worklog Id: (was: 890802)
Time Spent: 50m (was: 40m)
> Make Kafka streaming pipeline be able to config the max poll records during
> runtime
> -----------------------------------------------------------------------------------
>
> Key: GOBBLIN-1956
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1956
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Zihan Li
> Priority: Major
> Time Spent: 50m
> Remaining Estimate: 0h
>
> Kafka max.poll.records affect the performance of the Kafka consumer.
> Config it to be a lower number means it take more time to process each
> record. While config it to be a higher number means we need more memory to
> hold the records. So we should use the avg record size to determine the value
> for different topics.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)