agavra commented on code in PR #9516:
URL: https://github.com/apache/pinot/pull/9516#discussion_r986046996
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.java:
##########
@@ -129,29 +130,39 @@ protected void pushAvroIntoKafka(List<File> avroFiles)
"io.confluent.kafka.serializers.KafkaAvroSerializer");
Producer<byte[], GenericRecord> avroProducer = new
KafkaProducer<>(avroProducerProps);
+ // this producer produces intentionally malformatted records so that
+ // we can test the behavior when consuming such records
Properties nonAvroProducerProps = new Properties();
nonAvroProducerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:" + getKafkaPort());
nonAvroProducerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer");
nonAvroProducerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer");
- Producer<byte[], byte[]> nonAvroProducer = new
KafkaProducer<>(nonAvroProducerProps);
+ Producer<byte[], byte[]> invalidDataProducer = new
KafkaProducer<>(nonAvroProducerProps);
if (injectTombstones()) {
// publish lots of tombstones to livelock the consumer if it can't
handle this properly
for (int i = 0; i < 1000; i++) {
// publish a tombstone first
- nonAvroProducer.send(
+ avroProducer.send(
new ProducerRecord<>(getKafkaTopic(),
Longs.toByteArray(System.currentTimeMillis()), null));
}
}
+
for (File avroFile : avroFiles) {
+ int numInvalidRecords = 0;
try (DataFileStream<GenericRecord> reader =
AvroUtils.getAvroReader(avroFile)) {
for (GenericRecord genericRecord : reader) {
byte[] keyBytes = (getPartitionColumn() == null) ?
Longs.toByteArray(System.currentTimeMillis())
:
(genericRecord.get(getPartitionColumn())).toString().getBytes();
- // Ignore getKafkaMessageHeader()
- nonAvroProducer.send(new ProducerRecord<>(getKafkaTopic(), keyBytes,
"Rubbish".getBytes(UTF_8)));
+
+ if (numInvalidRecords < NUM_INVALID_RECORDS) {
+ // send a few rubbish records to validate that the consumer will
skip over non-avro records, but
+ // don't spam them every time as it causes log spam
Review Comment:
I think we should - this will allow us to re-enable the logs (helpful for
debugging in the future) and the test should run faster since it's producing
way less garbage to kafka
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]