This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 2d2b948 [GOBBLIN-1331] Add average record size to work unit state[]
2d2b948 is described below
commit 2d2b948a4ae9cbb47caae39327f9ad9e2b476893
Author: suvasude <[email protected]>
AuthorDate: Fri Dec 4 16:39:58 2020 -0800
[GOBBLIN-1331] Add average record size to work unit state[]
Closes #3166 from sv2000/avgRecordSize
---
.../packer/KafkaTopicGroupingWorkUnitPacker.java | 25 +++++++++++++---------
1 file changed, 15 insertions(+), 10 deletions(-)
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
index aeab8e0..ba5b608 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
@@ -68,6 +68,7 @@ import static
org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.
*/
@Slf4j
public class KafkaTopicGroupingWorkUnitPacker extends KafkaWorkUnitPacker {
+ public static final String GOBBLIN_KAFKA_PREFIX = "gobblin.kafka.";
private static final int DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER = 10;
private static final String TOPIC_PARTITION_DELIMITER = "-";
@@ -76,24 +77,25 @@ public class KafkaTopicGroupingWorkUnitPacker extends
KafkaWorkUnitPacker {
// packed into a single workunit. For example, if the container capacity is
set to 10, and each topic partition has a
// weight of 1, then 10 partitions of the topic will be packed into a single
workunit. This configuration is topic-independent
// i.e. all topics will be assumed to have the same peak consumption rate
when set.
- public static final String CONTAINER_CAPACITY_KEY =
"gobblin.kafka.streaming.containerCapacity";
+ public static final String CONTAINER_CAPACITY_KEY = GOBBLIN_KAFKA_PREFIX +
"streaming.containerCapacity";
public static final double DEFAULT_CONTAINER_CAPACITY = 10;
//A boolean flag to enable per-topic container capacity, where "container
capacity" is as defined earlier. This
// configuration is useful in scenarios where the write performance can vary
significantly across topics due to differences
// in schema, as in the case of columnar formats such as ORC and Parquet.
When enabled, the bin packing algorithm uses
// historic consumption rates for a given topic as tracked by the ingestion
pipeline.
-
- public static final String IS_PER_TOPIC_CONTAINER_CAPACITY_ENABLED_KEY =
"gobblin.kafka.streaming.isPerTopicBinCapacityEnabled";
+ public static final String IS_PER_TOPIC_CONTAINER_CAPACITY_ENABLED_KEY =
GOBBLIN_KAFKA_PREFIX + "streaming.isPerTopicBinCapacityEnabled";
public static final Boolean DEFAULT_IS_PER_TOPIC_CONTAINER_CAPACITY_ENABLED
= false;
//A topic-specific config that controls the minimum number of containers for
that topic.
- public static final String MIN_CONTAINERS_FOR_TOPIC =
"gobblin.kafka.minContainersForTopic";
- public static final String PARTITION_WATERMARK =
"gobblin.kafka.partition.watermark";
- public static final String PACKING_START_TIME_MILLIS =
"gobblin.kafka.packer.packingStartTimeMillis";
- public static final String IS_STATS_BASED_PACKING_ENABLED_KEY =
"gobblin.kafka.streaming.isStatsBasedPackingEnabled";
+ public static final String MIN_CONTAINERS_FOR_TOPIC = GOBBLIN_KAFKA_PREFIX +
"minContainersForTopic";
+ public static final String PARTITION_WATERMARK = GOBBLIN_KAFKA_PREFIX +
"partition.watermark";
+
+ public static final String PACKING_START_TIME_MILLIS = GOBBLIN_KAFKA_PREFIX
+ "packer.packingStartTimeMillis";
+ public static final String IS_STATS_BASED_PACKING_ENABLED_KEY =
GOBBLIN_KAFKA_PREFIX + "streaming.isStatsBasedPackingEnabled";
public static final boolean DEFAULT_IS_STATS_BASED_PACKING_ENABLED = false;
- public static final String CONTAINER_CAPACITY_COMPUTATION_STRATEGY_KEY =
"gobblin.kafka.streaming.containerCapacityComputationStrategy";
+ public static final String CONTAINER_CAPACITY_COMPUTATION_STRATEGY_KEY =
+ GOBBLIN_KAFKA_PREFIX + "streaming.containerCapacityComputationStrategy";
public static final String DEFAULT_CONTAINER_CAPACITY_COMPUTATION_STRATEGY =
ContainerCapacityComputationStrategy.MEDIAN.name();
public enum ContainerCapacityComputationStrategy {
@@ -113,9 +115,7 @@ public class KafkaTopicGroupingWorkUnitPacker extends
KafkaWorkUnitPacker {
/**
* When indexing-packing is enabled, the number of partitions is important
for extractor to know
* how many kafka partitions need to be pulled from.
- *
* Set to public-static to share with Extractor.
- * TODO: Shall be changed to package-private
*/
public static final String NUM_PARTITIONS_ASSIGNED =
"gobblin.kafka.streaming.numPartitions";
//A derived metric that defines the default workunit size, in case of
workunit size cannot be estimated.
@@ -257,6 +257,11 @@ public class KafkaTopicGroupingWorkUnitPacker extends
KafkaWorkUnitPacker {
workUnit.setProp(PACKING_START_TIME_MILLIS,
this.packingStartTimeMillis);
workUnit.setProp(DEFAULT_WORKUNIT_SIZE_KEY, getDefaultWorkUnitSize());
workUnit.setProp(MIN_WORKUNIT_SIZE_KEY, getMinWorkUnitSize(workUnit));
+ // avgRecordSize is unknown when bootstrapping. so skipping setting
this
+ // and ORC writer will use the default setting for the tunning feature.
+ if (watermark != null && watermark.getAvgRecordSize() > 0) {
+ workUnit.setProp(ConfigurationKeys.AVG_RECORD_SIZE,
watermark.getAvgRecordSize());
+ }
}
}
}