sajjad-moradi commented on code in PR #9051:
URL: https://github.com/apache/pinot/pull/9051#discussion_r926077254
##########
pinot-plugins/pinot-input-format/pinot-confluent-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/confluent/KafkaConfluentSchemaRegistryAvroMessageDecoder.java:
##########
@@ -103,12 +107,37 @@ public void init(Map<String, String> props, Set<String>
fieldsToRead, String top
@Override
public GenericRow decode(byte[] payload, GenericRow destination) {
- Record avroRecord = (Record) _deserializer.deserialize(_topicName,
payload);
- return _avroRecordExtractor.extract(avroRecord, destination);
+ try {
+ Record avroRecord = (Record) _deserializer.deserialize(_topicName,
payload);
+ return _avroRecordExtractor.extract(avroRecord, destination);
+ } catch (RuntimeException e) {
+ ignoreOrRethrowException(e);
+ return null;
+ }
}
@Override
public GenericRow decode(byte[] payload, int offset, int length, GenericRow
destination) {
return decode(Arrays.copyOfRange(payload, offset, offset + length),
destination);
}
+
+ /**
+ * This method handles specific serialisation exceptions. If the exception
cannot be ignored the method
+ * re-throws the exception.
+ *
+ * @param e exception to handle
+ */
+ private void ignoreOrRethrowException(RuntimeException e) {
+ if (isUnknownMagicByte(e) || isUnknownMagicByte(e.getCause())) {
+ // Do nothing, the message is not an Avro message and can't be decoded
+ LOGGER.error("Caught exception while decoding row in topic {},
discarding row", _topicName, e);
+ return;
Review Comment:
What do you think of incrementing a counter metric here? Normally people
don't go through the logs, but they can get alerted on metrics.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]