Hi Ralph, thank you for your response, I'll be creating the issue shortly.
Il giorno mer 21 ago 2019 alle ore 13:37 Apache <[email protected]> ha scritto: > It hasn’t been discussed. Feel free to create a Jira issue and a pull > request. Please make sure you include a test for your change. > > Ralph > > > On Aug 21, 2019, at 12:31 AM, Federico D'Ambrosio <[email protected]> > wrote: > > > > Hello everyone, > > > > I wanted to discuss with you if it's possible or if you would consider > > useful adding the possibility to send the LogEvent time as a timestamp > for > > the record when using the log4j KafkaAppender. I think it could be very > > useful for everyone using Kafka as a log aggregator having the > possibility > > to use the event time, rather than the time the record is being sent. > > Bear with me, I've just started looking at the souce code of > KafkaAppender > > and may overlook something in the broader scope of log4j. > > > > As far as I've seen in the source code, the message is sent by > KafkaManager: > > > > 146 private void tryAppend(final LogEvent event) throws > > ExecutionException, InterruptedException, TimeoutException {147 > > final Layout<? extends Serializable> layout = getLayout();148 > > byte[] data;149 if (layout instanceof SerializedLayout) {150 > > final byte[] header = layout.getHeader();151 final > > byte[] body = layout.toByteArray(event);152 data = new > > byte[header.length + body.length];153 > > System.arraycopy(header, 0, data, 0, header.length);154 > > System.arraycopy(body, 0, data, header.length, body.length);155 > > } else {156 data = layout.toByteArray(event);157 > > }*158 manager.send(data);*159 } > > > > with manager.send() implemented this way, with highlighted the creation > of > > the ProducerRecord: > > > > 108 public void send(final byte[] msg) throws ExecutionException, > > InterruptedException, TimeoutException {109 if (producer != > > null) {110 byte[] newKey = null;111112 if(key != > > null && key.contains("${")) {113 newKey = > > > getLoggerContext().getConfiguration().getStrSubstitutor().replace(key).getBytes(StandardCharsets.UTF_8);114 > > } else if (key != null) {115 newKey = > > key.getBytes(StandardCharsets.UTF_8);116 }117*118 > > final ProducerRecord<byte[], byte[]> newRecord = new > > ProducerRecord<>(topic, newKey, msg);*119 if (syncSend) > > {120 final Future<RecordMetadata> response = > > producer.send(newRecord);121 > > response.get(timeoutMillis, TimeUnit.MILLISECONDS);122 } > > else {123 producer.send(newRecord, new Callback() {124 > > @Override125 public void > > onCompletion(final RecordMetadata metadata, final Exception e) {126 > > if (e != null) {127 > > LOGGER.error("Unable to write to Kafka in appender [" + getName() + > > "]", e);128 }129 }130 > > });131 }132 }133 } > > > > > > Now, ProducerRecord has the additional parameters, in particular, I'm > > looking at: > > > https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html#ProducerRecord-java.lang.String-java.lang.Integer-java.lang.Long-K-V- > > < > https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html#ProducerRecord-java.lang.String-java.lang.Integer-java.lang.Long-K-V- > > > > > > public ProducerRecord(java.lang.String topic, > > java.lang.Integer partition, > > java.lang.Long timestamp, > > K > > < > https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html > > > > key, > > V > > < > https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html > > > > value) > > > > which would allow us to set the timestamp as *LogEvent#getTimeMillis()*, > > but would force us to also input the partition where the record should be > > sent. Still, the logic behind the partitioning within the KafkaProducer > is > > so that if partition is null, then the defined partitioner will be used > > (DefaultPartitioner or the one defined by the 'partitioner.class' > > property), so, we could simply assign it as null. > > > > In terms of interface, we could add a single flag in the KafkaAppender > > definition, something like: > > > > <Kafka name="kafka-appender" topic="topic" timestamp="true"> </Kafka> > > > > If the 'timestamp' flag is false, then the record would be sent with the > > timestamp parameter of the ProducerRecord as null, leaving the behaviour > as > > it is right now. > > > > What do you think about that? Was this something which was already > > discussed? > > > > Thank you for your attention, > > Federico > > > -- Federico D'Ambrosio
