This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new af5299d5b [GOBBLIN-1956]Make Kafka streaming pipeline be able to
config the max poll records during runtime (#3827)
af5299d5b is described below
commit af5299d5b3e01e2587dbecf315a18973bba1eb42
Author: Zihan Li <[email protected]>
AuthorDate: Thu Nov 16 15:19:16 2023 -0800
[GOBBLIN-1956]Make Kafka streaming pipeline be able to config the max poll
records during runtime (#3827)
* address comments
* use connectionmanager when httpclient is not cloesable
* add uite test
* fix typo
* [GOBBLIN-1956] Make Kafka streaming pipeline be able to config the max
poll records during runtime
* small refractor
---------
Co-authored-by: Zihan Li <[email protected]>
---
.../extract/kafka/KafkaStreamingExtractor.java | 23 ++++++++++++++++++++--
1 file changed, 21 insertions(+), 2 deletions(-)
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
index 3fa4d4c28..8320a82ec 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
@@ -87,6 +87,11 @@ import static
org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.
public class KafkaStreamingExtractor<S> extends FlushingExtractor<S,
DecodeableKafkaRecord> {
public static final String DATASET_KEY = "dataset";
public static final String DATASET_PARTITION_KEY = "datasetPartition";
+ public static final String MAX_KAFKA_BUFFER_SIZE_IN_BYTES =
"kafka.streaming.max.kafka.buffer.size.in.bytes";
+ public static final Long DEFAULT_MAX_KAFKA_BUFFER_SIZE_IN_BYTES =
Long.valueOf(50 * 1024 * 1024);
+ // Max number of records to be pulled in single polling.
+ private static final String KAFKA_MAX_POLL_RECORDS_KEY =
"kafka.consumer.maxPollRecords";
+ private static final int DEFAULT_MAX_POLL_RECORDS = 100;
private static final Long MAX_LOG_ERRORS = 100L;
private static final String
KAFKA_EXTRACTOR_STATS_REPORTING_INTERVAL_MINUTES_KEY =
@@ -115,6 +120,7 @@ public class KafkaStreamingExtractor<S> extends
FlushingExtractor<S, DecodeableK
protected MultiLongWatermark lowWatermark;
protected MultiLongWatermark highWatermark;
protected MultiLongWatermark nextWatermark;
+ protected long maxAvgRecordSize = -1;
protected Map<Integer, DecodeableKafkaRecord>
perPartitionLastSuccessfulRecord;
private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
@@ -214,6 +220,19 @@ public class KafkaStreamingExtractor<S> extends
FlushingExtractor<S, DecodeableK
public KafkaStreamingExtractor(WorkUnitState state) {
super(state);
+ this.topicPartitions = getTopicPartitionsFromWorkUnit(state);
+ Map<KafkaPartition, LongWatermark> topicPartitionWatermarks =
getTopicPartitionWatermarks(this.topicPartitions);
+ if (this.maxAvgRecordSize > 0 ) {
+ long maxPollRecords =
+ state.getPropAsLong(MAX_KAFKA_BUFFER_SIZE_IN_BYTES,
DEFAULT_MAX_KAFKA_BUFFER_SIZE_IN_BYTES) / maxAvgRecordSize;
+ maxPollRecords = Math.min(maxPollRecords,
state.getPropAsInt(KAFKA_MAX_POLL_RECORDS_KEY, DEFAULT_MAX_POLL_RECORDS));
+ state.setProp(KAFKA_MAX_POLL_RECORDS_KEY, maxPollRecords);
+ log.info("set max.poll.records to be " + maxPollRecords);
+ } else {
+ // As there is no avg record size available, using lower number to make
sure we don't hit OOM issue
+ state.setProp(KAFKA_MAX_POLL_RECORDS_KEY, DEFAULT_MAX_POLL_RECORDS);
+ log.info("set max.poll.records to be {}", DEFAULT_MAX_POLL_RECORDS);
+ }
this.kafkaConsumerClientResolver =
new
ClassAliasResolver<>(GobblinKafkaConsumerClient.GobblinKafkaConsumerClientFactory.class);
try {
@@ -229,8 +248,7 @@ public class KafkaStreamingExtractor<S> extends
FlushingExtractor<S, DecodeableK
this._schemaRegistry =
state.contains(KafkaSchemaRegistry.KAFKA_SCHEMA_REGISTRY_CLASS) ? Optional.of(
KafkaSchemaRegistry.<String, S>get(state.getProperties())) :
Optional.<KafkaSchemaRegistry<String, S>>absent();
- this.topicPartitions = getTopicPartitionsFromWorkUnit(state);
- this.kafkaConsumerClient.assignAndSeek(topicPartitions,
getTopicPartitionWatermarks(this.topicPartitions));
+ this.kafkaConsumerClient.assignAndSeek(topicPartitions,
topicPartitionWatermarks);
this.messageIterator = this.kafkaConsumerClient.consume();
this.partitions = KafkaUtils.getPartitions(state);
@@ -292,6 +310,7 @@ public class KafkaStreamingExtractor<S> extends
FlushingExtractor<S, DecodeableK
if (kafkaWatermarkMap.containsKey(topicPartitionString)) {
LongWatermark longWatermark = ((KafkaWatermark)
kafkaWatermarkMap.get(topicPartitionString)).getLwm();
longWatermarkMap.put(topicPartition, longWatermark);
+ maxAvgRecordSize = Math.max(maxAvgRecordSize, ((KafkaWatermark)
kafkaWatermarkMap.get(topicPartitionString)).getAvgRecordSize());
} else {
longWatermarkMap.put(topicPartition, new LongWatermark(0L));
}