shirshanka commented on a change in pull request #2802: GOBBLIN-930: Add number
of topic partitions of a topic in each workunit
URL: https://github.com/apache/incubator-gobblin/pull/2802#discussion_r360985779
##########
File path:
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
##########
@@ -363,14 +363,16 @@ private void
createEmptyWorkUnitsForSkippedPartitions(Map<String, List<WorkUnit>
context.close();
List<WorkUnit> workUnits = Lists.newArrayList();
- for (KafkaPartition partition : topic.getPartitions()) {
+ List<KafkaPartition> topicPartitions = topic.getPartitions();
+ for (KafkaPartition partition : topicPartitions) {
WorkUnit workUnit = getWorkUnitForTopicPartition(partition, state,
topicSpecificState);
if (workUnit != null) {
// For disqualified topics, for each of its workunits set the high
watermark to be the same
// as the low watermark, so that it will be skipped.
if (!topicQualified) {
skipWorkUnit(workUnit);
}
+ workUnit.setProp(NUM_TOPIC_PARTITIONS, topicPartitions.size());
Review comment:
Compute size once (outside the for loop) and use it?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services