sv2000 commented on a change in pull request #2898: [GOBBLIN-1058] Refactor 
method emitting GTE for ease of adding new tags 
URL: https://github.com/apache/incubator-gobblin/pull/2898#discussion_r383616119
 
 

 ##########
 File path: 
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
 ##########
 @@ -319,18 +319,52 @@ public void updateStatisticsForCurrentPartition(int 
partitionIdx, long readStart
    */
   public void emitTrackingEvents(MetricContext context, MultiLongWatermark 
lowWatermark, MultiLongWatermark highWatermark,
       MultiLongWatermark nextWatermark) {
+    emitTrackingEventsWithAdditionalTags(context, lowWatermark, highWatermark, 
nextWatermark, Maps.newHashMap());
+  }
+
+  /**
+   * Emit Tracking events reporting the various statistics to be consumed by a 
monitoring application, with additional
+   * map representing tags beyond what are constructed in {@link 
#createTagsForPartition(int, MultiLongWatermark, MultiLongWatermark, 
MultiLongWatermark) }
+   *
+   * Choose to not to make createTagsForPartition extensible to avoid 
additional derived class just for additional k-v pairs
+   * in the tag maps.
+   *
+   * @param additionalTags caller-provided mapping from {@link KafkaPartition} 
to {@link Map<String, String>}, which will
+   *                       be merged with result of {@link 
#createTagsForPartition}.
+   */
+  public void emitTrackingEventsWithAdditionalTags(MetricContext context, 
MultiLongWatermark lowWatermark, MultiLongWatermark highWatermark,
+      MultiLongWatermark nextWatermark, Map<KafkaPartition, Map<String, 
String>> additionalTags) {
+    Map<KafkaPartition, Map<String, String>> tagsForPartitionsMap =
+        generateTagsForPartitions(lowWatermark, highWatermark, nextWatermark, 
additionalTags);
+
+    for (Map.Entry<KafkaPartition, Map<String, String>> eventTags : 
tagsForPartitionsMap.entrySet()) {
+      EventSubmitter.Builder eventSubmitterBuilder = new 
EventSubmitter.Builder(context, GOBBLIN_KAFKA_NAMESPACE);
+      
eventSubmitterBuilder.addMetadata(this.taskEventMetadataGenerator.getMetadata(workUnitState,
 KAFKA_EXTRACTOR_TOPIC_METADATA_EVENT_NAME));
+      
eventSubmitterBuilder.build().submit(KAFKA_EXTRACTOR_TOPIC_METADATA_EVENT_NAME, 
eventTags.getValue());
+    }
+  }
+
+  /**
+   * A helper function to merge tags for KafkaPartition. Separate into a 
package-private method for ease of testing.
+   */
+  Map<KafkaPartition, Map<String, String>> 
generateTagsForPartitions(MultiLongWatermark lowWatermark, MultiLongWatermark 
highWatermark,
 
 Review comment:
   @VisibleforTesting?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to