This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new b3c27dc [GOBBLIN-1393] Set offset lag to 0 for topics with no
previous watermark
b3c27dc is described below
commit b3c27dc1d52cd4ea7c039fbd6894ef19ccac5e0b
Author: vbohra <[email protected]>
AuthorDate: Sun Feb 21 12:04:46 2021 -0800
[GOBBLIN-1393] Set offset lag to 0 for topics with no previous watermark
Closes #3232 from vikrambohra/containerAllocation
---
.../workunit/packer/ProduceRateAndLagBasedWorkUnitSizeEstimator.java | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/ProduceRateAndLagBasedWorkUnitSizeEstimator.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/ProduceRateAndLagBasedWorkUnitSizeEstimator.java
index 98ec5be..006a5ea 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/ProduceRateAndLagBasedWorkUnitSizeEstimator.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/ProduceRateAndLagBasedWorkUnitSizeEstimator.java
@@ -102,6 +102,8 @@ public class ProduceRateAndLagBasedWorkUnitSizeEstimator
implements KafkaWorkUni
avgProduceRates = watermark.getAvgProduceRates();
avgRecordSize = watermark.getAvgRecordSize();
offsetLag = offsetLag - watermark.getLwm().getValue();
+ } else {
+ offsetLag = 0L;
}
double maxProduceRate = getMaxProduceRateUntilNextReplan(avgProduceRates,
@@ -120,7 +122,7 @@ public class ProduceRateAndLagBasedWorkUnitSizeEstimator
implements KafkaWorkUni
((double) (offsetLag * avgRecordSize) / (catchUpSlaInHours * 3600 *
ONE_MEGA_BYTE)) + (maxProduceRate
* produceRateScalingFactor);
- log.debug("TopicPartiton: {}, Max produce rate: {}, Offset lag: {}, Avg
Record size: {}, Target Consume Rate: {}, Min Workunit size: {}",
+ log.info("TopicPartiton: {}, Max produce rate: {}, Offset lag: {}, Avg
Record size: {}, Target Consume Rate: {}, Min Workunit size: {}",
topic + ":" + partition, maxProduceRate, offsetLag, avgRecordSize,
targetConsumeRate, minWorkUnitSize);
//Return the target consumption rate to catch up with incoming traffic and
current lag, as the workunit size.
return Math.max(targetConsumeRate, minWorkUnitSize);