This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new b3c27dc  [GOBBLIN-1393] Set offset lag to 0 for topics with no 
previous watermark
b3c27dc is described below

commit b3c27dc1d52cd4ea7c039fbd6894ef19ccac5e0b
Author: vbohra <[email protected]>
AuthorDate: Sun Feb 21 12:04:46 2021 -0800

    [GOBBLIN-1393] Set offset lag to 0 for topics with no previous watermark
    
    Closes #3232 from vikrambohra/containerAllocation
---
 .../workunit/packer/ProduceRateAndLagBasedWorkUnitSizeEstimator.java  | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/ProduceRateAndLagBasedWorkUnitSizeEstimator.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/ProduceRateAndLagBasedWorkUnitSizeEstimator.java
index 98ec5be..006a5ea 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/ProduceRateAndLagBasedWorkUnitSizeEstimator.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/ProduceRateAndLagBasedWorkUnitSizeEstimator.java
@@ -102,6 +102,8 @@ public class ProduceRateAndLagBasedWorkUnitSizeEstimator 
implements KafkaWorkUni
       avgProduceRates = watermark.getAvgProduceRates();
       avgRecordSize = watermark.getAvgRecordSize();
       offsetLag = offsetLag - watermark.getLwm().getValue();
+    } else {
+      offsetLag = 0L;
     }
 
     double maxProduceRate = getMaxProduceRateUntilNextReplan(avgProduceRates,
@@ -120,7 +122,7 @@ public class ProduceRateAndLagBasedWorkUnitSizeEstimator 
implements KafkaWorkUni
         ((double) (offsetLag * avgRecordSize) / (catchUpSlaInHours * 3600 * 
ONE_MEGA_BYTE)) + (maxProduceRate
             * produceRateScalingFactor);
 
-    log.debug("TopicPartiton: {}, Max produce rate: {}, Offset lag: {}, Avg 
Record size: {}, Target Consume Rate: {}, Min Workunit size: {}",
+    log.info("TopicPartiton: {}, Max produce rate: {}, Offset lag: {}, Avg 
Record size: {}, Target Consume Rate: {}, Min Workunit size: {}",
         topic + ":" + partition, maxProduceRate, offsetLag, avgRecordSize, 
targetConsumeRate, minWorkUnitSize);
     //Return the target consumption rate to catch up with incoming traffic and 
current lag, as the workunit size.
     return Math.max(targetConsumeRate, minWorkUnitSize);

Reply via email to