This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 67a060f [GOBBLIN-1254] Skip undecodeable message in
KafkaAvroJobStatusMonitor
67a060f is described below
commit 67a060fbe861c178ac8a4be494b440a234a96993
Author: zhchen <[email protected]>
AuthorDate: Wed Sep 2 11:52:56 2020 -0700
[GOBBLIN-1254] Skip undecodeable message in KafkaAvroJobStatusMonitor
Closes #3095 from zxcware/und
---
.../apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java | 10 ++++++++++
.../gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java | 2 +-
2 files changed, 11 insertions(+), 1 deletion(-)
diff --git
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index 937c705..a4df1c5 100644
---
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
+++
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -18,6 +18,7 @@ package org.apache.gobblin.runtime;
import java.io.File;
import java.io.IOException;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -243,6 +244,15 @@ public class KafkaAvroJobStatusMonitorTest {
ConsumerIterator<byte[], byte[]> iterator =
this.kafkaTestHelper.getIteratorForTopic(TOPIC);
MessageAndMetadata<byte[], byte[]> messageAndMetadata = iterator.next();
+ // Verify undecodeable message is skipped
+ byte[] undecodeableMessage = Arrays.copyOf(messageAndMetadata.message(),
+ messageAndMetadata.message().length - 1);
+ ConsumerRecord undecodeableRecord = new ConsumerRecord<>(TOPIC,
messageAndMetadata.partition(),
+ messageAndMetadata.offset(), messageAndMetadata.key(),
undecodeableMessage);
+ Assert.assertEquals(jobStatusMonitor.getMessageParseFailures().getCount(),
0L);
+ jobStatusMonitor.processMessage(new
Kafka09ConsumerClient.Kafka09ConsumerRecord(undecodeableRecord));
+ Assert.assertEquals(jobStatusMonitor.getMessageParseFailures().getCount(),
1L);
+ // Test an normal event
jobStatusMonitor.processMessage(convertMessageAndMetadataToDecodableKafkaRecord(messageAndMetadata));
StateStore stateStore = jobStatusMonitor.getStateStore();
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
index c4b6070..e4ead68 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
@@ -95,7 +95,7 @@ public class KafkaAvroJobStatusMonitor extends
KafkaJobStatusMonitor {
try {
GobblinTrackingEvent decodedMessage = this.reader.get().read(null,
decoder);
return parseJobStatus(decodedMessage);
- } catch (AvroRuntimeException | IOException exc) {
+ } catch (Exception exc) {
this.messageParseFailures.mark();
if (this.messageParseFailures.getFiveMinuteRate() < 1) {
log.warn("Unable to decode input message.", exc);