Repository: incubator-gobblin Updated Branches: refs/heads/master fcc4d412a -> 8e974ef09
[GOBBLIN-589] Add undecodable message count to Gobblin Kafka tracking event Closes #2457 from cshen98/metrics2 Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/8e974ef0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/8e974ef0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/8e974ef0 Branch: refs/heads/master Commit: 8e974ef094d2c3abb4da45a5d5806fda7ff52d0b Parents: fcc4d41 Author: Carl Shen <[email protected]> Authored: Mon Sep 17 16:27:59 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Mon Sep 17 16:27:59 2018 -0700 ---------------------------------------------------------------------- .../source/extractor/extract/kafka/KafkaExtractor.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8e974ef0/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java index 664446f..b0d38b2 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java @@ -60,7 +60,7 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> { private static final Logger LOG = LoggerFactory.getLogger(KafkaExtractor.class); protected static final int INITIAL_PARTITION_IDX = -1; - protected static final Integer MAX_LOG_DECODING_ERRORS = 5; + protected static final Long MAX_LOG_DECODING_ERRORS = 5L; // Constants for event submission public static final String TOPIC = "topic"; @@ -70,6 +70,7 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> { public static final String EXPECTED_HIGH_WATERMARK = "expectedHighWatermark"; public static final String ELAPSED_TIME = "elapsedTime"; public static final String PROCESSED_RECORD_COUNT = "processedRecordCount"; + public static final String UNDECODABLE_MESSAGE_COUNT = "undecodableMessageCount"; public static final String PARTITION_TOTAL_SIZE = "partitionTotalSize"; public static final String AVG_RECORD_PULL_TIME = "avgRecordPullTime"; public static final String READ_RECORD_TIME = "readRecordTime"; @@ -87,7 +88,7 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> { protected final GobblinKafkaConsumerClient kafkaConsumerClient; private final ClassAliasResolver<GobblinKafkaConsumerClientFactory> kafkaConsumerClientResolver; - protected final Map<KafkaPartition, Integer> decodingErrorCount; + protected final Map<KafkaPartition, Long> decodingErrorCount; private final Map<KafkaPartition, Double> avgMillisPerRecord; private final Map<KafkaPartition, Long> avgRecordSizes; private final Map<KafkaPartition, Long> elapsedTime; @@ -233,8 +234,8 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> { this.undecodableMessageCount++; if (shouldLogError()) { LOG.error(String.format("A record from partition %s cannot be decoded.", getCurrentPartition()), t); - incrementErrorCount(); } + incrementErrorCount(); } } } @@ -339,7 +340,7 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> { if (this.decodingErrorCount.containsKey(getCurrentPartition())) { this.decodingErrorCount.put(getCurrentPartition(), this.decodingErrorCount.get(getCurrentPartition()) + 1); } else { - this.decodingErrorCount.put(getCurrentPartition(), 1); + this.decodingErrorCount.put(getCurrentPartition(), 1L); } } @@ -459,6 +460,9 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> { tagsForPartition.put(READ_RECORD_TIME, "0"); } + tagsForPartition.put(UNDECODABLE_MESSAGE_COUNT, + Long.toString(this.decodingErrorCount.getOrDefault(partition, 0L))); + // Commit avg time to pull a record for each partition if (this.avgMillisPerRecord.containsKey(partition)) { double avgMillis = this.avgMillisPerRecord.get(partition);
