Hello everyone,
I wanted to discuss with you if it's possible or if you would consider
useful adding the possibility to send the LogEvent time as a timestamp for
the record when using the log4j KafkaAppender. I think it could be very
useful for everyone using Kafka as a log aggregator having the possibility
to use the event time, rather than the time the record is being sent.
Bear with me, I've just started looking at the souce code of KafkaAppender
and may overlook something in the broader scope of log4j.
As far as I've seen in the source code, the message is sent by KafkaManager:
146 private void tryAppend(final LogEvent event) throws
ExecutionException, InterruptedException, TimeoutException {147
final Layout<? extends Serializable> layout = getLayout();148
byte[] data;149 if (layout instanceof SerializedLayout) {150
final byte[] header = layout.getHeader();151 final
byte[] body = layout.toByteArray(event);152 data = new
byte[header.length + body.length];153
System.arraycopy(header, 0, data, 0, header.length);154
System.arraycopy(body, 0, data, header.length, body.length);155
} else {156 data = layout.toByteArray(event);157
}*158 manager.send(data);*159 }
with manager.send() implemented this way, with highlighted the creation of
the ProducerRecord:
108 public void send(final byte[] msg) throws ExecutionException,
InterruptedException, TimeoutException {109 if (producer !=
null) {110 byte[] newKey = null;111112 if(key !=
null && key.contains("${")) {113 newKey =
getLoggerContext().getConfiguration().getStrSubstitutor().replace(key).getBytes(StandardCharsets.UTF_8);114
} else if (key != null) {115 newKey =
key.getBytes(StandardCharsets.UTF_8);116 }117*118
final ProducerRecord<byte[], byte[]> newRecord = new
ProducerRecord<>(topic, newKey, msg);*119 if (syncSend)
{120 final Future<RecordMetadata> response =
producer.send(newRecord);121
response.get(timeoutMillis, TimeUnit.MILLISECONDS);122 }
else {123 producer.send(newRecord, new Callback() {124
@Override125 public void
onCompletion(final RecordMetadata metadata, final Exception e) {126
if (e != null) {127
LOGGER.error("Unable to write to Kafka in appender [" + getName() +
"]", e);128 }129 }130
});131 }132 }133 }
Now, ProducerRecord has the additional parameters, in particular, I'm
looking at:
https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html#ProducerRecord-java.lang.String-java.lang.Integer-java.lang.Long-K-V-
<https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html#ProducerRecord-java.lang.String-java.lang.Integer-java.lang.Long-K-V->
public ProducerRecord(java.lang.String topic,
java.lang.Integer partition,
java.lang.Long timestamp,
K
<https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html>
key,
V
<https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html>
value)
which would allow us to set the timestamp as *LogEvent#getTimeMillis()*,
but would force us to also input the partition where the record should be
sent. Still, the logic behind the partitioning within the KafkaProducer is
so that if partition is null, then the defined partitioner will be used
(DefaultPartitioner or the one defined by the 'partitioner.class'
property), so, we could simply assign it as null.
In terms of interface, we could add a single flag in the KafkaAppender
definition, something like:
<Kafka name="kafka-appender" topic="topic" timestamp="true"> </Kafka>
If the 'timestamp' flag is false, then the record would be sent with the
timestamp parameter of the ProducerRecord as null, leaving the behaviour as
it is right now.
What do you think about that? Was this something which was already
discussed?
Thank you for your attention,
Federico