This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new af5299d5b [GOBBLIN-1956]Make Kafka streaming pipeline be able to 
config the max poll records during runtime (#3827)
af5299d5b is described below

commit af5299d5b3e01e2587dbecf315a18973bba1eb42
Author: Zihan Li <[email protected]>
AuthorDate: Thu Nov 16 15:19:16 2023 -0800

    [GOBBLIN-1956]Make Kafka streaming pipeline be able to config the max poll 
records during runtime (#3827)
    
    * address comments
    
    * use connectionmanager when httpclient is not cloesable
    
    * add uite test
    
    * fix typo
    
    * [GOBBLIN-1956] Make Kafka streaming pipeline be able to config the max 
poll records during runtime
    
    * small refractor
    
    ---------
    
    Co-authored-by: Zihan Li <[email protected]>
---
 .../extract/kafka/KafkaStreamingExtractor.java     | 23 ++++++++++++++++++++--
 1 file changed, 21 insertions(+), 2 deletions(-)

diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
index 3fa4d4c28..8320a82ec 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
@@ -87,6 +87,11 @@ import static 
org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.
 public class KafkaStreamingExtractor<S> extends FlushingExtractor<S, 
DecodeableKafkaRecord> {
   public static final String DATASET_KEY = "dataset";
   public static final String DATASET_PARTITION_KEY = "datasetPartition";
+  public static final String MAX_KAFKA_BUFFER_SIZE_IN_BYTES = 
"kafka.streaming.max.kafka.buffer.size.in.bytes";
+  public static final Long DEFAULT_MAX_KAFKA_BUFFER_SIZE_IN_BYTES = 
Long.valueOf(50 * 1024 * 1024);
+  // Max number of records to be pulled in single polling.
+  private static final String KAFKA_MAX_POLL_RECORDS_KEY = 
"kafka.consumer.maxPollRecords";
+  private static final int DEFAULT_MAX_POLL_RECORDS = 100;
   private static final Long MAX_LOG_ERRORS = 100L;
 
   private static final String 
KAFKA_EXTRACTOR_STATS_REPORTING_INTERVAL_MINUTES_KEY =
@@ -115,6 +120,7 @@ public class KafkaStreamingExtractor<S> extends 
FlushingExtractor<S, DecodeableK
   protected MultiLongWatermark lowWatermark;
   protected MultiLongWatermark highWatermark;
   protected MultiLongWatermark nextWatermark;
+  protected long maxAvgRecordSize = -1;
   protected Map<Integer, DecodeableKafkaRecord> 
perPartitionLastSuccessfulRecord;
   private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
 
@@ -214,6 +220,19 @@ public class KafkaStreamingExtractor<S> extends 
FlushingExtractor<S, DecodeableK
 
   public KafkaStreamingExtractor(WorkUnitState state) {
     super(state);
+    this.topicPartitions = getTopicPartitionsFromWorkUnit(state);
+    Map<KafkaPartition, LongWatermark> topicPartitionWatermarks = 
getTopicPartitionWatermarks(this.topicPartitions);
+    if (this.maxAvgRecordSize > 0 ) {
+      long maxPollRecords =
+          state.getPropAsLong(MAX_KAFKA_BUFFER_SIZE_IN_BYTES, 
DEFAULT_MAX_KAFKA_BUFFER_SIZE_IN_BYTES) / maxAvgRecordSize;
+      maxPollRecords = Math.min(maxPollRecords, 
state.getPropAsInt(KAFKA_MAX_POLL_RECORDS_KEY, DEFAULT_MAX_POLL_RECORDS));
+      state.setProp(KAFKA_MAX_POLL_RECORDS_KEY, maxPollRecords);
+      log.info("set max.poll.records to be " + maxPollRecords);
+    } else {
+      // As there is no avg record size available, using lower number to make 
sure we don't hit OOM issue
+      state.setProp(KAFKA_MAX_POLL_RECORDS_KEY, DEFAULT_MAX_POLL_RECORDS);
+      log.info("set max.poll.records to be {}", DEFAULT_MAX_POLL_RECORDS);
+    }
     this.kafkaConsumerClientResolver =
         new 
ClassAliasResolver<>(GobblinKafkaConsumerClient.GobblinKafkaConsumerClientFactory.class);
     try {
@@ -229,8 +248,7 @@ public class KafkaStreamingExtractor<S> extends 
FlushingExtractor<S, DecodeableK
     this._schemaRegistry = 
state.contains(KafkaSchemaRegistry.KAFKA_SCHEMA_REGISTRY_CLASS) ? Optional.of(
         KafkaSchemaRegistry.<String, S>get(state.getProperties())) : 
Optional.<KafkaSchemaRegistry<String, S>>absent();
 
-    this.topicPartitions = getTopicPartitionsFromWorkUnit(state);
-    this.kafkaConsumerClient.assignAndSeek(topicPartitions, 
getTopicPartitionWatermarks(this.topicPartitions));
+    this.kafkaConsumerClient.assignAndSeek(topicPartitions, 
topicPartitionWatermarks);
     this.messageIterator = this.kafkaConsumerClient.consume();
 
     this.partitions = KafkaUtils.getPartitions(state);
@@ -292,6 +310,7 @@ public class KafkaStreamingExtractor<S> extends 
FlushingExtractor<S, DecodeableK
       if (kafkaWatermarkMap.containsKey(topicPartitionString)) {
         LongWatermark longWatermark = ((KafkaWatermark) 
kafkaWatermarkMap.get(topicPartitionString)).getLwm();
         longWatermarkMap.put(topicPartition, longWatermark);
+        maxAvgRecordSize = Math.max(maxAvgRecordSize, ((KafkaWatermark) 
kafkaWatermarkMap.get(topicPartitionString)).getAvgRecordSize());
       } else {
         longWatermarkMap.put(topicPartition, new LongWatermark(0L));
       }

Reply via email to