Hello everyone,

I wanted to discuss with you if it's possible or if you would consider
useful adding the possibility to send the LogEvent time as a timestamp for
the record when using the log4j KafkaAppender. I think it could be very
useful for everyone using Kafka as a log aggregator having the possibility
to use the event time, rather than the time the record is being sent.
Bear with me, I've just started looking at the souce code of KafkaAppender
and may overlook something in the broader scope of log4j.

As far as I've seen in the source code, the message is sent by KafkaManager:

146    private void tryAppend(final LogEvent event) throws
ExecutionException, InterruptedException, TimeoutException {147
final Layout<? extends Serializable> layout = getLayout();148
byte[] data;149        if (layout instanceof SerializedLayout) {150
        final byte[] header = layout.getHeader();151            final
byte[] body = layout.toByteArray(event);152            data = new
byte[header.length + body.length];153
System.arraycopy(header, 0, data, 0, header.length);154
System.arraycopy(body, 0, data, header.length, body.length);155
} else {156            data = layout.toByteArray(event);157
}*158        manager.send(data);*159    }

with manager.send() implemented this way, with highlighted the creation of
the ProducerRecord:

108    public void send(final byte[] msg) throws ExecutionException,
InterruptedException, TimeoutException {109        if (producer !=
null) {110            byte[] newKey = null;111112            if(key !=
null && key.contains("${")) {113                newKey =
getLoggerContext().getConfiguration().getStrSubstitutor().replace(key).getBytes(StandardCharsets.UTF_8);114
           } else if (key != null) {115                newKey =
key.getBytes(StandardCharsets.UTF_8);116            }117*118
 final ProducerRecord<byte[], byte[]> newRecord = new
ProducerRecord<>(topic, newKey, msg);*119            if (syncSend)
{120                final Future<RecordMetadata> response =
producer.send(newRecord);121
response.get(timeoutMillis, TimeUnit.MILLISECONDS);122            }
else {123                producer.send(newRecord, new Callback() {124
                  @Override125                    public void
onCompletion(final RecordMetadata metadata, final Exception e) {126
                    if (e != null) {127
LOGGER.error("Unable to write to Kafka in appender [" + getName() +
"]", e);128                        }129                    }130
        });131            }132        }133    }


Now, ProducerRecord has the additional parameters, in particular, I'm
looking at:
https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html#ProducerRecord-java.lang.String-java.lang.Integer-java.lang.Long-K-V-
<https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html#ProducerRecord-java.lang.String-java.lang.Integer-java.lang.Long-K-V->

public ProducerRecord(java.lang.String topic,
                      java.lang.Integer partition,
                      java.lang.Long timestamp,
                      K
<https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html>
key,
                      V
<https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html>
value)

which would allow us to set the timestamp as *LogEvent#getTimeMillis()*,
but would force us to also input the partition where the record should be
sent. Still, the logic behind the partitioning within the KafkaProducer is
so that if partition is null, then the defined partitioner will be used
(DefaultPartitioner or the one defined by the 'partitioner.class'
property), so, we could simply assign it as null.

In terms of interface, we could add a single flag in the KafkaAppender
definition, something like:

<Kafka name="kafka-appender" topic="topic" timestamp="true"> </Kafka>

If the 'timestamp' flag is false, then the record would be sent with the
timestamp parameter of the ProducerRecord as null, leaving the behaviour as
it is right now.

What do you think about that? Was this something which was already
discussed?

Thank you for your attention,
Federico

Reply via email to