sv2000 commented on a change in pull request #2898: [GOBBLIN-1058] Refactor
method emitting GTE for ease of adding new tags
URL: https://github.com/apache/incubator-gobblin/pull/2898#discussion_r383616119
##########
File path:
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
##########
@@ -319,18 +319,52 @@ public void updateStatisticsForCurrentPartition(int
partitionIdx, long readStart
*/
public void emitTrackingEvents(MetricContext context, MultiLongWatermark
lowWatermark, MultiLongWatermark highWatermark,
MultiLongWatermark nextWatermark) {
+ emitTrackingEventsWithAdditionalTags(context, lowWatermark, highWatermark,
nextWatermark, Maps.newHashMap());
+ }
+
+ /**
+ * Emit Tracking events reporting the various statistics to be consumed by a
monitoring application, with additional
+ * map representing tags beyond what are constructed in {@link
#createTagsForPartition(int, MultiLongWatermark, MultiLongWatermark,
MultiLongWatermark) }
+ *
+ * Choose to not to make createTagsForPartition extensible to avoid
additional derived class just for additional k-v pairs
+ * in the tag maps.
+ *
+ * @param additionalTags caller-provided mapping from {@link KafkaPartition}
to {@link Map<String, String>}, which will
+ * be merged with result of {@link
#createTagsForPartition}.
+ */
+ public void emitTrackingEventsWithAdditionalTags(MetricContext context,
MultiLongWatermark lowWatermark, MultiLongWatermark highWatermark,
+ MultiLongWatermark nextWatermark, Map<KafkaPartition, Map<String,
String>> additionalTags) {
+ Map<KafkaPartition, Map<String, String>> tagsForPartitionsMap =
+ generateTagsForPartitions(lowWatermark, highWatermark, nextWatermark,
additionalTags);
+
+ for (Map.Entry<KafkaPartition, Map<String, String>> eventTags :
tagsForPartitionsMap.entrySet()) {
+ EventSubmitter.Builder eventSubmitterBuilder = new
EventSubmitter.Builder(context, GOBBLIN_KAFKA_NAMESPACE);
+
eventSubmitterBuilder.addMetadata(this.taskEventMetadataGenerator.getMetadata(workUnitState,
KAFKA_EXTRACTOR_TOPIC_METADATA_EVENT_NAME));
+
eventSubmitterBuilder.build().submit(KAFKA_EXTRACTOR_TOPIC_METADATA_EVENT_NAME,
eventTags.getValue());
+ }
+ }
+
+ /**
+ * A helper function to merge tags for KafkaPartition. Separate into a
package-private method for ease of testing.
+ */
+ Map<KafkaPartition, Map<String, String>>
generateTagsForPartitions(MultiLongWatermark lowWatermark, MultiLongWatermark
highWatermark,
Review comment:
@VisibleforTesting?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services