Sonke,
thanks again.
Yes, I replaced the non-kafka code from our end with a simple Sysout
statement as follows:
do {
ConsumerRecords<byte[], byte[]> records =
consumer.poll(Duration.of(timeout, ChronoUnit.MILLIS));
for (final ConsumerRecord<byte[], byte[]> record : records) {
if (!infiniteLoop && !oneTimeMode) {
--msgCount;
if (msgCount < 0) {
break;
}
}
Debugger.doPrint("value read:<" + record.value() + ">");
/*outputViews.write(new BinaryOutput() {
@Override
public Document getHeader() {
return generateHeader(record, oldHeader);
}
@Override
public void write(WritableByteChannel writeChannel) throws
IOException {
try (OutputStream os = Channels.newOutputStream(writeChannel)) {
os.write(record.value());
}
}
});*/
//The offset to commit should be the next offset of the current one,
// according to the API
offsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
//In suggest mode, we should not change the current offset
if (isSyncCommit && isSuggest) {
commitOffset(offsets);
offsets.clear();
}
}
} while ((msgCount > 0 || infiniteLoop) && isRunning.get());
*Note: *Debugger is a wrapper class that just writes the given string
to a local file using PrintStream's println() method.
And I don't see any diff in the metrics. I still see the huge amount of
memory allocated.
See the image attached.
Thanks,
On Mon, Mar 4, 2019 at 5:17 PM Sönke Liebau
<[email protected]> wrote:
> Hi Syed,
>
> let's keep it on the list for now so that everybody can participate :)
>
> The different .poll() method was just an unrelated observation, the
> main points of my mail were the question about whether this is the
> correct metric you are looking at and replacing the payload of your
> code with a println statement to remove non-Kafka code from your
> program and make sure that the leak is not in there. Have you tried
> that?
>
> Best regards,
> Sönke
>
> On Mon, Mar 4, 2019 at 7:21 AM Syed Mudassir Ahmed
> <[email protected]> wrote:
> >
> > Sonke,
> > Thanks so much for the reply. I used the new version of
> poll(Duration) method. Still, I see memory issue.
> > Is there a way we can get on a one-one call and discuss this pls? Let
> me know your availability. I can share zoom meeting link.
> >
> > Thanks,
> >
> >
> >
> > On Sat, Mar 2, 2019 at 2:15 AM Sönke Liebau
> > <[email protected]>
> wrote:
> >>
> >> Hi Syed,
> >>
> >> from your screenshot I assume that you are using SnapLogic to run your
> >> code (full disclosure: I do not have the faintest idea of this
> >> product!). I've just had a look at the docs and am a bit confused by
> >> their explanation of the metric that you point out in your image
> >> "Memory Allocated". The docs say: "The Memory Allocated reflects the
> >> number of bytes that were allocated by the Snap. Note that this
> >> number does not reflect the amount of memory that was freed and it is
> >> not the peak memory usage of the Snap. So, it is not necessarily a
> >> metric that can be used to estimate the required size of a Snaplex
> >> node. Rather, the number provides an insight into how much memory had
> >> to be allocated to process all of the documents. For example, if the
> >> total allocated was 5MB and the Snap processed 32 documents, then the
> >> Snap allocated roughly 164KB per document. When combined with the
> >> other statistics, this number can help to identify the potential
> >> causes of performance issues."
> >> The part about not reflecting memory that was freed makes me somewhat
> >> doubtful whether this actually reflects how much memory the process
> >> currently holds. Can you give some more insight there?
> >>
> >> Apart from that, I just ran your code somewhat modified to make it
> >> work without dependencies for 2 hours and saw no unusual memory
> >> consumption, just a regular garbage collection sawtooth pattern. That
> >> being said, I had to replace your actual processing with a simple
> >> println, so if there is a memory leak in there I would of course not
> >> have noticed.
> >> I've uploaded the code I ran [1] for reference. For further analysis,
> >> maybe you could run something similar with just a println or noop and
> >> see if the symptoms persist, to localize the leak (if it exists).
> >>
> >> Also, two random observations on your code:
> >>
> >> KafkaConsumer.poll(Long timeout) is deprecated, you should consider
> >> using the overloaded version with a Duration parameter instead.
> >>
> >> The comment at [2] seems to contradict the following code, as the
> >> offsets are only changed when in suggest mode. But as I have no idea
> >> what suggest mode even is or all this means this observation may be
> >> miles of point :)
> >>
> >> I hope that helps a little.
> >>
> >> Best regards,
> >> Sönke
> >>
> >> [1]
> https://gist.github.com/soenkeliebau/e77e8665a1e7e49ade9ec27a6696e983
> >> [2]
> https://gist.github.com/soenkeliebau/e77e8665a1e7e49ade9ec27a6696e983#file-memoryleak-java-L86
> >>
> >>
> >> On Fri, Mar 1, 2019 at 7:35 AM Syed Mudassir Ahmed
> >> <[email protected]> wrote:
> >> >
> >> >
> >> > Thanks,
> >> >
> >> >
> >> >
> >> > ---------- Forwarded message ---------
> >> > From: Syed Mudassir Ahmed <[email protected]>
> >> > Date: Tue, Feb 26, 2019 at 12:40 PM
> >> > Subject: Apache Kafka Memory Leakage???
> >> > To: <[email protected]>
> >> > Cc: Syed Mudassir Ahmed <[email protected]>
> >> >
> >> >
> >> > Hi Team,
> >> > I have a java application based out of latest Apache Kafka version
> 2.1.1.
> >> > I have a consumer application that runs infinitely to consume
> messages whenever produced.
> >> > Sometimes there are no messages produced for hours. Still, I see
> that the memory allocated to consumer program is drastically increasing.
> >> > My code is as follows:
> >> >
> >> > AtomicBoolean isRunning = new AtomicBoolean(true);
> >> >
> >> > Properties kafkaProperties = new Properties();
> >> >
> >> > kafkaProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
> brokers);
> >> >
> >> > kafkaProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
> >> >
> >> > kafkaProperties.put(ConsumerConfig.CLIENT_ID_CONFIG,
> UUID.randomUUID().toString());
> >> > kafkaProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
> >> > kafkaProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
> AUTO_OFFSET_RESET_EARLIEST);
> >> > consumer = new KafkaConsumer<byte[], byte[]>(kafkaProperties,
> keyDeserializer, valueDeserializer);
> >> > if (topics != null) {
> >> > subscribeTopics(topics);
> >> > }
> >> >
> >> >
> >> > boolean infiniteLoop = false;
> >> > boolean oneTimeMode = false;
> >> > int timeout = consumeTimeout;
> >> > if (isSuggest) {
> >> > //Configuration for suggest mode
> >> > oneTimeMode = true;
> >> > msgCount = 0;
> >> > timeout = DEFAULT_CONSUME_TIMEOUT_IN_MS;
> >> > } else if (msgCount < 0) {
> >> > infiniteLoop = true;
> >> > } else if (msgCount == 0) {
> >> > oneTimeMode = true;
> >> > }
> >> > Map<TopicPartition, OffsetAndMetadata> offsets =
> Maps.newHashMap();
> >> > do {
> >> > ConsumerRecords<byte[], byte[]> records =
> consumer.poll(timeout);
> >> > for (final ConsumerRecord<byte[], byte[]> record :
> records) {
> >> > if (!infiniteLoop && !oneTimeMode) {
> >> > --msgCount;
> >> > if (msgCount < 0) {
> >> > break;
> >> > }
> >> > }
> >> > outputViews.write(new BinaryOutput() {
> >> > @Override
> >> > public Document getHeader() {
> >> > return generateHeader(record, oldHeader);
> >> > }
> >> >
> >> > @Override
> >> > public void write(WritableByteChannel
> writeChannel) throws IOException {
> >> > try (OutputStream os =
> Channels.newOutputStream(writeChannel)) {
> >> > os.write(record.value());
> >> > }
> >> > }
> >> > });
> >> > //The offset to commit should be the next offset of
> the current one,
> >> > // according to the API
> >> > offsets.put(new TopicPartition(record.topic(),
> record.partition()),
> >> > new OffsetAndMetadata(record.offset() + 1));
> >> > //In suggest mode, we should not change the current
> offset
> >> > if (isSyncCommit && isSuggest) {
> >> > commitOffset(offsets);
> >> > offsets.clear();
> >> > }
> >> > }
> >> > } while ((msgCount > 0 || infiniteLoop) && isRunning.get());
> >> >
> >> >
> >> > See the screenshot below. In about nineteen hours, it just consumed
> 5 messages but the memory allocated is 1.6GB.
> >> >
> >> >
> >> > Any clues on how to get rid of memory issue? Anything I need to do
> in the program or is it a bug in the kafka library?
> >> >
> >> > Please rever ASAP.
> >> >
> >> >
> >> > Thanks,
> >> >
> >>
> >>
> >> --
> >> Sönke Liebau
> >> Partner
> >> Tel. +49 179 7940878
> >> OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany
>
>
>
> --
> Sönke Liebau
> Partner
> Tel. +49 179 7940878
> OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany
>