This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d2b948  [GOBBLIN-1331] Add average record size to work unit state[]
2d2b948 is described below

commit 2d2b948a4ae9cbb47caae39327f9ad9e2b476893
Author: suvasude <[email protected]>
AuthorDate: Fri Dec 4 16:39:58 2020 -0800

    [GOBBLIN-1331] Add average record size to work unit state[]
    
    Closes #3166 from sv2000/avgRecordSize
---
 .../packer/KafkaTopicGroupingWorkUnitPacker.java   | 25 +++++++++++++---------
 1 file changed, 15 insertions(+), 10 deletions(-)

diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
index aeab8e0..ba5b608 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
@@ -68,6 +68,7 @@ import static 
org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.
  */
 @Slf4j
 public class KafkaTopicGroupingWorkUnitPacker extends KafkaWorkUnitPacker {
+  public static final String GOBBLIN_KAFKA_PREFIX = "gobblin.kafka.";
   private static final int DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER = 10;
   private static final String TOPIC_PARTITION_DELIMITER = "-";
 
@@ -76,24 +77,25 @@ public class KafkaTopicGroupingWorkUnitPacker extends 
KafkaWorkUnitPacker {
   // packed into a single workunit. For example, if the container capacity is 
set to 10, and each topic partition has a
   // weight of 1, then 10 partitions of the topic will be packed into a single 
workunit. This configuration is topic-independent
   // i.e. all topics will be assumed to have the same peak consumption rate 
when set.
-  public static final String CONTAINER_CAPACITY_KEY = 
"gobblin.kafka.streaming.containerCapacity";
+  public static final String CONTAINER_CAPACITY_KEY = GOBBLIN_KAFKA_PREFIX + 
"streaming.containerCapacity";
   public static final double DEFAULT_CONTAINER_CAPACITY = 10;
 
   //A boolean flag to enable per-topic container capacity, where "container 
capacity" is as defined earlier. This
   // configuration is useful in scenarios where the write performance can vary 
significantly across topics due to differences
   // in schema, as in the case of columnar formats such as ORC and Parquet. 
When enabled, the bin packing algorithm uses
   // historic consumption rates for a given topic as tracked by the ingestion 
pipeline.
-
-  public static final String IS_PER_TOPIC_CONTAINER_CAPACITY_ENABLED_KEY = 
"gobblin.kafka.streaming.isPerTopicBinCapacityEnabled";
+  public static final String IS_PER_TOPIC_CONTAINER_CAPACITY_ENABLED_KEY = 
GOBBLIN_KAFKA_PREFIX + "streaming.isPerTopicBinCapacityEnabled";
   public static final Boolean DEFAULT_IS_PER_TOPIC_CONTAINER_CAPACITY_ENABLED 
= false;
 
   //A topic-specific config that controls the minimum number of containers for 
that topic.
-  public static final String MIN_CONTAINERS_FOR_TOPIC = 
"gobblin.kafka.minContainersForTopic";
-  public static final String PARTITION_WATERMARK = 
"gobblin.kafka.partition.watermark";
-  public static final String PACKING_START_TIME_MILLIS = 
"gobblin.kafka.packer.packingStartTimeMillis";
-  public static final String IS_STATS_BASED_PACKING_ENABLED_KEY = 
"gobblin.kafka.streaming.isStatsBasedPackingEnabled";
+  public static final String MIN_CONTAINERS_FOR_TOPIC = GOBBLIN_KAFKA_PREFIX + 
"minContainersForTopic";
+  public static final String PARTITION_WATERMARK = GOBBLIN_KAFKA_PREFIX + 
"partition.watermark";
+
+  public static final String PACKING_START_TIME_MILLIS = GOBBLIN_KAFKA_PREFIX 
+ "packer.packingStartTimeMillis";
+  public static final String IS_STATS_BASED_PACKING_ENABLED_KEY = 
GOBBLIN_KAFKA_PREFIX + "streaming.isStatsBasedPackingEnabled";
   public static final boolean DEFAULT_IS_STATS_BASED_PACKING_ENABLED = false;
-  public static final String CONTAINER_CAPACITY_COMPUTATION_STRATEGY_KEY = 
"gobblin.kafka.streaming.containerCapacityComputationStrategy";
+  public static final String CONTAINER_CAPACITY_COMPUTATION_STRATEGY_KEY =
+      GOBBLIN_KAFKA_PREFIX + "streaming.containerCapacityComputationStrategy";
   public static final String DEFAULT_CONTAINER_CAPACITY_COMPUTATION_STRATEGY = 
ContainerCapacityComputationStrategy.MEDIAN.name();
 
   public enum ContainerCapacityComputationStrategy {
@@ -113,9 +115,7 @@ public class KafkaTopicGroupingWorkUnitPacker extends 
KafkaWorkUnitPacker {
   /**
    * When indexing-packing is enabled, the number of partitions is important 
for extractor to know
    * how many kafka partitions need to be pulled from.
-   *
    * Set to public-static to share with Extractor.
-   * TODO: Shall be changed to package-private
    */
   public static final String NUM_PARTITIONS_ASSIGNED = 
"gobblin.kafka.streaming.numPartitions";
   //A derived metric that defines the default workunit size, in case of 
workunit size cannot be estimated.
@@ -257,6 +257,11 @@ public class KafkaTopicGroupingWorkUnitPacker extends 
KafkaWorkUnitPacker {
         workUnit.setProp(PACKING_START_TIME_MILLIS, 
this.packingStartTimeMillis);
         workUnit.setProp(DEFAULT_WORKUNIT_SIZE_KEY, getDefaultWorkUnitSize());
         workUnit.setProp(MIN_WORKUNIT_SIZE_KEY, getMinWorkUnitSize(workUnit));
+        // avgRecordSize is unknown when bootstrapping. so skipping setting 
this
+        // and ORC writer will use the default setting for the tunning feature.
+        if (watermark != null && watermark.getAvgRecordSize() > 0) {
+          workUnit.setProp(ConfigurationKeys.AVG_RECORD_SIZE, 
watermark.getAvgRecordSize());
+        }
       }
     }
   }

Reply via email to