Hi, to follow up, I've created the JIRA here:
https://issues.apache.org/jira/browse/LOG4J2-2678
and implemented the changes on my branch.
Still, I'm having troubles running the tests, is there any resource I can
look up to see what's the correct way to add a new test?
I'm asking because I've noticed that KafkaAppenderTest is referencing a
file named KafkaAppenderTest.xml, containing a log4j configuration, so I
added a new KafkaAppender to that file:
<Configuration name="KafkaAppenderTest" status="OFF">
<Appenders>
<Kafka name="KafkaAppenderWithTimestamp" topic="kafka-topic" key="key">
<PatternLayout pattern="%m"/>
<Property name="timeout.ms">1000</Property>
<Property name="bootstrap.servers">localhost:9092</Property>
</Kafka>
</Appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="KafkaAppenderWithTimestamp"/>
</Root>
</Loggers>
</Configuration>
but apparently I've done something wrong, because when I run the test I've
added (which uses ctx.getRequiredAppender("KafkaAppenderWithTimestamp") ),
I'm getting the error:
Appender named KafkaAppenderWithTimestamp was null.
So, evidently, I'm missing something here.
Thank you very much,
Federico
Il giorno mer 21 ago 2019 alle ore 20:34 Federico D'Ambrosio <
[email protected]> ha scritto:
> Hi Ralph,
>
> thank you for your response, I'll be creating the issue shortly.
>
>
>
> Il giorno mer 21 ago 2019 alle ore 13:37 Apache <
> [email protected]> ha scritto:
>
>> It hasn’t been discussed. Feel free to create a Jira issue and a pull
>> request. Please make sure you include a test for your change.
>>
>> Ralph
>>
>> > On Aug 21, 2019, at 12:31 AM, Federico D'Ambrosio <[email protected]>
>> wrote:
>> >
>> > Hello everyone,
>> >
>> > I wanted to discuss with you if it's possible or if you would consider
>> > useful adding the possibility to send the LogEvent time as a timestamp
>> for
>> > the record when using the log4j KafkaAppender. I think it could be very
>> > useful for everyone using Kafka as a log aggregator having the
>> possibility
>> > to use the event time, rather than the time the record is being sent.
>> > Bear with me, I've just started looking at the souce code of
>> KafkaAppender
>> > and may overlook something in the broader scope of log4j.
>> >
>> > As far as I've seen in the source code, the message is sent by
>> KafkaManager:
>> >
>> > 146 private void tryAppend(final LogEvent event) throws
>> > ExecutionException, InterruptedException, TimeoutException {147
>> > final Layout<? extends Serializable> layout = getLayout();148
>> > byte[] data;149 if (layout instanceof SerializedLayout) {150
>> > final byte[] header = layout.getHeader();151 final
>> > byte[] body = layout.toByteArray(event);152 data = new
>> > byte[header.length + body.length];153
>> > System.arraycopy(header, 0, data, 0, header.length);154
>> > System.arraycopy(body, 0, data, header.length, body.length);155
>> > } else {156 data = layout.toByteArray(event);157
>> > }*158 manager.send(data);*159 }
>> >
>> > with manager.send() implemented this way, with highlighted the creation
>> of
>> > the ProducerRecord:
>> >
>> > 108 public void send(final byte[] msg) throws ExecutionException,
>> > InterruptedException, TimeoutException {109 if (producer !=
>> > null) {110 byte[] newKey = null;111112 if(key !=
>> > null && key.contains("${")) {113 newKey =
>> >
>> getLoggerContext().getConfiguration().getStrSubstitutor().replace(key).getBytes(StandardCharsets.UTF_8);114
>> > } else if (key != null) {115 newKey =
>> > key.getBytes(StandardCharsets.UTF_8);116 }117*118
>> > final ProducerRecord<byte[], byte[]> newRecord = new
>> > ProducerRecord<>(topic, newKey, msg);*119 if (syncSend)
>> > {120 final Future<RecordMetadata> response =
>> > producer.send(newRecord);121
>> > response.get(timeoutMillis, TimeUnit.MILLISECONDS);122 }
>> > else {123 producer.send(newRecord, new Callback() {124
>> > @Override125 public void
>> > onCompletion(final RecordMetadata metadata, final Exception e) {126
>> > if (e != null) {127
>> > LOGGER.error("Unable to write to Kafka in appender [" + getName() +
>> > "]", e);128 }129 }130
>> > });131 }132 }133 }
>> >
>> >
>> > Now, ProducerRecord has the additional parameters, in particular, I'm
>> > looking at:
>> >
>> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html#ProducerRecord-java.lang.String-java.lang.Integer-java.lang.Long-K-V-
>> > <
>> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html#ProducerRecord-java.lang.String-java.lang.Integer-java.lang.Long-K-V-
>> >
>> >
>> > public ProducerRecord(java.lang.String topic,
>> > java.lang.Integer partition,
>> > java.lang.Long timestamp,
>> > K
>> > <
>> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html
>> >
>> > key,
>> > V
>> > <
>> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html
>> >
>> > value)
>> >
>> > which would allow us to set the timestamp as *LogEvent#getTimeMillis()*,
>> > but would force us to also input the partition where the record should
>> be
>> > sent. Still, the logic behind the partitioning within the KafkaProducer
>> is
>> > so that if partition is null, then the defined partitioner will be used
>> > (DefaultPartitioner or the one defined by the 'partitioner.class'
>> > property), so, we could simply assign it as null.
>> >
>> > In terms of interface, we could add a single flag in the KafkaAppender
>> > definition, something like:
>> >
>> > <Kafka name="kafka-appender" topic="topic" timestamp="true"> </Kafka>
>> >
>> > If the 'timestamp' flag is false, then the record would be sent with the
>> > timestamp parameter of the ProducerRecord as null, leaving the
>> behaviour as
>> > it is right now.
>> >
>> > What do you think about that? Was this something which was already
>> > discussed?
>> >
>> > Thank you for your attention,
>> > Federico
>>
>>
>>
>
> --
> Federico D'Ambrosio
>
--
Federico D'Ambrosio