[
https://issues.apache.org/jira/browse/GOBBLIN-1956?focusedWorklogId=890849&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-890849
]
ASF GitHub Bot logged work on GOBBLIN-1956:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 16/Nov/23 03:01
Start Date: 16/Nov/23 03:01
Worklog Time Spent: 10m
Work Description: homatthew commented on code in PR #3827:
URL: https://github.com/apache/gobblin/pull/3827#discussion_r1395095423
##########
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java:
##########
@@ -214,6 +220,19 @@ public LongWatermark getLwm() {
public KafkaStreamingExtractor(WorkUnitState state) {
super(state);
+ this.topicPartitions = getTopicPartitionsFromWorkUnit(state);
+ Map<KafkaPartition, LongWatermark> topicPartitionWatermarks =
getTopicPartitionWatermarks(this.topicPartitions);
+ if (this.maxAvgRecordSize > 0 ) {
+ long maxPollRecords =
+ state.getPropAsLong(MAX_KAFKA_BUFFER_SIZE_IN_BYTES,
DEFAULT_MAX_KAFKA_BUFFER_SIZE_IN_BYTES) / maxAvgRecordSize;
Review Comment:
Makes sense. I think 50MB seems like a good default.
Here's an example of how the throughput would look for some higher volume
topics. All topics under 5kb would be able to do 1000 records per second
(pageviewevent is only ~800bytes and URE is ~3MB).
Issue Time Tracking
-------------------
Worklog Id: (was: 890849)
Time Spent: 1h (was: 50m)
> Make Kafka streaming pipeline be able to config the max poll records during
> runtime
> -----------------------------------------------------------------------------------
>
> Key: GOBBLIN-1956
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1956
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Zihan Li
> Priority: Major
> Time Spent: 1h
> Remaining Estimate: 0h
>
> Kafka max.poll.records affect the performance of the Kafka consumer.
> Config it to be a lower number means it take more time to process each
> record. While config it to be a higher number means we need more memory to
> hold the records. So we should use the avg record size to determine the value
> for different topics.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)