Hi Ralph,

thank you for your response, I'll be creating the issue shortly.



Il giorno mer 21 ago 2019 alle ore 13:37 Apache <[email protected]>
ha scritto:

> It hasn’t been discussed. Feel free to create a Jira issue and a pull
> request. Please make sure you include a test for your change.
>
> Ralph
>
> > On Aug 21, 2019, at 12:31 AM, Federico D'Ambrosio <[email protected]>
> wrote:
> >
> > Hello everyone,
> >
> > I wanted to discuss with you if it's possible or if you would consider
> > useful adding the possibility to send the LogEvent time as a timestamp
> for
> > the record when using the log4j KafkaAppender. I think it could be very
> > useful for everyone using Kafka as a log aggregator having the
> possibility
> > to use the event time, rather than the time the record is being sent.
> > Bear with me, I've just started looking at the souce code of
> KafkaAppender
> > and may overlook something in the broader scope of log4j.
> >
> > As far as I've seen in the source code, the message is sent by
> KafkaManager:
> >
> > 146    private void tryAppend(final LogEvent event) throws
> > ExecutionException, InterruptedException, TimeoutException {147
> > final Layout<? extends Serializable> layout = getLayout();148
> > byte[] data;149        if (layout instanceof SerializedLayout) {150
> >        final byte[] header = layout.getHeader();151            final
> > byte[] body = layout.toByteArray(event);152            data = new
> > byte[header.length + body.length];153
> > System.arraycopy(header, 0, data, 0, header.length);154
> > System.arraycopy(body, 0, data, header.length, body.length);155
> > } else {156            data = layout.toByteArray(event);157
> > }*158        manager.send(data);*159    }
> >
> > with manager.send() implemented this way, with highlighted the creation
> of
> > the ProducerRecord:
> >
> > 108    public void send(final byte[] msg) throws ExecutionException,
> > InterruptedException, TimeoutException {109        if (producer !=
> > null) {110            byte[] newKey = null;111112            if(key !=
> > null && key.contains("${")) {113                newKey =
> >
> getLoggerContext().getConfiguration().getStrSubstitutor().replace(key).getBytes(StandardCharsets.UTF_8);114
> >           } else if (key != null) {115                newKey =
> > key.getBytes(StandardCharsets.UTF_8);116            }117*118
> > final ProducerRecord<byte[], byte[]> newRecord = new
> > ProducerRecord<>(topic, newKey, msg);*119            if (syncSend)
> > {120                final Future<RecordMetadata> response =
> > producer.send(newRecord);121
> > response.get(timeoutMillis, TimeUnit.MILLISECONDS);122            }
> > else {123                producer.send(newRecord, new Callback() {124
> >                  @Override125                    public void
> > onCompletion(final RecordMetadata metadata, final Exception e) {126
> >                    if (e != null) {127
> > LOGGER.error("Unable to write to Kafka in appender [" + getName() +
> > "]", e);128                        }129                    }130
> >        });131            }132        }133    }
> >
> >
> > Now, ProducerRecord has the additional parameters, in particular, I'm
> > looking at:
> >
> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html#ProducerRecord-java.lang.String-java.lang.Integer-java.lang.Long-K-V-
> > <
> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html#ProducerRecord-java.lang.String-java.lang.Integer-java.lang.Long-K-V-
> >
> >
> > public ProducerRecord(java.lang.String topic,
> >                      java.lang.Integer partition,
> >                      java.lang.Long timestamp,
> >                      K
> > <
> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html
> >
> > key,
> >                      V
> > <
> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html
> >
> > value)
> >
> > which would allow us to set the timestamp as *LogEvent#getTimeMillis()*,
> > but would force us to also input the partition where the record should be
> > sent. Still, the logic behind the partitioning within the KafkaProducer
> is
> > so that if partition is null, then the defined partitioner will be used
> > (DefaultPartitioner or the one defined by the 'partitioner.class'
> > property), so, we could simply assign it as null.
> >
> > In terms of interface, we could add a single flag in the KafkaAppender
> > definition, something like:
> >
> > <Kafka name="kafka-appender" topic="topic" timestamp="true"> </Kafka>
> >
> > If the 'timestamp' flag is false, then the record would be sent with the
> > timestamp parameter of the ProducerRecord as null, leaving the behaviour
> as
> > it is right now.
> >
> > What do you think about that? Was this something which was already
> > discussed?
> >
> > Thank you for your attention,
> > Federico
>
>
>

-- 
Federico D'Ambrosio

Reply via email to