ZihanLi58 commented on code in PR #3827:
URL: https://github.com/apache/gobblin/pull/3827#discussion_r1394841114


##########
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java:
##########
@@ -214,6 +220,19 @@ public LongWatermark getLwm() {
 
   public KafkaStreamingExtractor(WorkUnitState state) {
     super(state);
+    this.topicPartitions = getTopicPartitionsFromWorkUnit(state);
+    Map<KafkaPartition, LongWatermark> topicPartitionWatermarks = 
getTopicPartitionWatermarks(this.topicPartitions);
+    if (this.maxAvgRecordSize > 0 ) {
+      long maxPollRecords =
+          state.getPropAsLong(MAX_KAFKA_BUFFER_SIZE_IN_BYTES, 
DEFAULT_MAX_KAFKA_BUFFER_SIZE_IN_BYTES) / maxAvgRecordSize;

Review Comment:
   there is no relationship between those two config.  The value should not be 
super small as we need to guarantee the performance, and not super large as we 
want to avoid OOM issue. So I gave 50MB here as default. And this should only 
take effect when record size are large, for normal topic, it should fall back 
to use KAFKA_MAX_POLL_RECORDS_KEY



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to