This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 67a060f  [GOBBLIN-1254] Skip undecodeable message in 
KafkaAvroJobStatusMonitor
67a060f is described below

commit 67a060fbe861c178ac8a4be494b440a234a96993
Author: zhchen <[email protected]>
AuthorDate: Wed Sep 2 11:52:56 2020 -0700

    [GOBBLIN-1254] Skip undecodeable message in KafkaAvroJobStatusMonitor
    
    Closes #3095 from zxcware/und
---
 .../apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java  | 10 ++++++++++
 .../gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java  |  2 +-
 2 files changed, 11 insertions(+), 1 deletion(-)

diff --git 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index 937c705..a4df1c5 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -18,6 +18,7 @@ package org.apache.gobblin.runtime;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
@@ -243,6 +244,15 @@ public class KafkaAvroJobStatusMonitorTest {
     ConsumerIterator<byte[], byte[]> iterator = 
this.kafkaTestHelper.getIteratorForTopic(TOPIC);
 
     MessageAndMetadata<byte[], byte[]> messageAndMetadata = iterator.next();
+    // Verify undecodeable message is skipped
+    byte[] undecodeableMessage = Arrays.copyOf(messageAndMetadata.message(),
+        messageAndMetadata.message().length - 1);
+    ConsumerRecord undecodeableRecord = new ConsumerRecord<>(TOPIC, 
messageAndMetadata.partition(),
+        messageAndMetadata.offset(), messageAndMetadata.key(), 
undecodeableMessage);
+    Assert.assertEquals(jobStatusMonitor.getMessageParseFailures().getCount(), 
0L);
+    jobStatusMonitor.processMessage(new 
Kafka09ConsumerClient.Kafka09ConsumerRecord(undecodeableRecord));
+    Assert.assertEquals(jobStatusMonitor.getMessageParseFailures().getCount(), 
1L);
+    // Test an normal event
     
jobStatusMonitor.processMessage(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata));
 
     StateStore stateStore = jobStatusMonitor.getStateStore();
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
index c4b6070..e4ead68 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
@@ -95,7 +95,7 @@ public class KafkaAvroJobStatusMonitor extends 
KafkaJobStatusMonitor {
     try {
       GobblinTrackingEvent decodedMessage = this.reader.get().read(null, 
decoder);
       return parseJobStatus(decodedMessage);
-    } catch (AvroRuntimeException | IOException exc) {
+    } catch (Exception exc) {
       this.messageParseFailures.mark();
       if (this.messageParseFailures.getFiveMinuteRate() < 1) {
         log.warn("Unable to decode input message.", exc);

Reply via email to