Sonke,
  This issue seems serious.  Customers raised bug with our product.  And I
suspect the bug is in apache-kafka clients library.
  I executed the kafka reader without any snaplogic-specific code.  There
were hardly about twenty messages in the topics.  The code consumed about
300MB of memory in about 2 hours.
  Please find attached the screenshot.
  Can we pls get on a call and arrive at the conclusion?  I still argue its
a bug in the kafka-clients library.

Thanks,



On Mon, Mar 4, 2019 at 8:33 PM Sönke Liebau
<soenke.lie...@opencore.com.invalid> wrote:

> Hi Syed,
>
> and you are sure that this memory is actually allocated? I still have my
> reservations about that metric to be honest. Is there any way to connect to
> the process with for example jconsole and having a look at memory
> consumption in there?
> Or alternatively, since the code you have sent is not relying on SnapLogic
> anymore, can you just run it as a standalone application and check memory
> consumption?
>
> That code looks very similar to what I ran (without knowing your input
> parameters for issuggest et. al of course) and for me memory consumption
> stayed between 120mb and 200mb.
>
> Best regards,
> Sönke
>
>
> On Mon, Mar 4, 2019 at 1:44 PM Syed Mudassir Ahmed <
> syed.mudas...@gaianconsultants.com> wrote:
>
>> Sonke,
>>   thanks again.
>>   Yes, I replaced the non-kafka code from our end with a simple Sysout
>> statement as follows:
>>
>> do {
>>     ConsumerRecords<byte[], byte[]> records = 
>> consumer.poll(Duration.of(timeout, ChronoUnit.MILLIS));
>>     for (final ConsumerRecord<byte[], byte[]> record : records) {
>>         if (!infiniteLoop && !oneTimeMode) {
>>             --msgCount;
>>             if (msgCount < 0) {
>>                 break;
>>             }
>>         }
>>         Debugger.doPrint("value read:<" + record.value() + ">");
>>         /*outputViews.write(new BinaryOutput() {
>>             @Override
>>             public Document getHeader() {
>>                 return generateHeader(record, oldHeader);
>>             }
>>
>>             @Override
>>             public void write(WritableByteChannel writeChannel) throws 
>> IOException {
>>                 try (OutputStream os = 
>> Channels.newOutputStream(writeChannel)) {
>>                     os.write(record.value());
>>                 }
>>             }
>>         });*/
>>         //The offset to commit should be the next offset of the current one,
>>         // according to the API
>>         offsets.put(new TopicPartition(record.topic(), record.partition()),
>>                 new OffsetAndMetadata(record.offset() + 1));
>>         //In suggest mode, we should not change the current offset
>>         if (isSyncCommit && isSuggest) {
>>             commitOffset(offsets);
>>             offsets.clear();
>>         }
>>     }
>> } while ((msgCount > 0 || infiniteLoop) && isRunning.get());
>>
>>
>> *Note: *Debugger is a wrapper class that just writes the given string to a 
>> local file using PrintStream's println() method.
>>
>> And I don't see any diff in the metrics.  I still see the huge amount of
>> memory allocated.
>>
>> See the image attached.
>>
>>
>> Thanks,
>>
>>
>>
>> On Mon, Mar 4, 2019 at 5:17 PM Sönke Liebau
>> <soenke.lie...@opencore.com.invalid> wrote:
>>
>>> Hi Syed,
>>>
>>> let's keep it on the list for now so that everybody can participate :)
>>>
>>> The different .poll() method was just an unrelated observation, the
>>> main points of my mail were the question about whether this is the
>>> correct metric you are looking at and replacing the payload of your
>>> code with a println statement to remove non-Kafka code from your
>>> program and make sure that the leak is not in there. Have you tried
>>> that?
>>>
>>> Best regards,
>>> Sönke
>>>
>>> On Mon, Mar 4, 2019 at 7:21 AM Syed Mudassir Ahmed
>>> <syed.mudas...@gaianconsultants.com> wrote:
>>> >
>>> > Sonke,
>>> >   Thanks so much for the reply.  I used the new version of
>>> poll(Duration) method.  Still, I see memory issue.
>>> >   Is there a way we can get on a one-one call and discuss this pls?
>>> Let me know your availability.  I can share zoom meeting link.
>>> >
>>> > Thanks,
>>> >
>>> >
>>> >
>>> > On Sat, Mar 2, 2019 at 2:15 AM Sönke Liebau <
>>> soenke.lie...@opencore.com.invalid> wrote:
>>> >>
>>> >> Hi Syed,
>>> >>
>>> >> from your screenshot I assume that you are using SnapLogic to run your
>>> >> code (full disclosure: I do not have the faintest idea of this
>>> >> product!). I've just had a look at the docs and am a bit confused by
>>> >> their explanation of the metric that you point out in your image
>>> >> "Memory Allocated". The docs say: "The Memory Allocated reflects the
>>> >> number of bytes that were allocated by the Snap.  Note that this
>>> >> number does not reflect the amount of memory that was freed and it is
>>> >> not the peak memory usage of the Snap.  So, it is not necessarily a
>>> >> metric that can be used to estimate the required size of a Snaplex
>>> >> node.  Rather, the number provides an insight into how much memory had
>>> >> to be allocated to process all of the documents.  For example, if the
>>> >> total allocated was 5MB and the Snap processed 32 documents, then the
>>> >> Snap allocated roughly 164KB per document.  When combined with the
>>> >> other statistics, this number can help to identify the potential
>>> >> causes of performance issues."
>>> >> The part about not reflecting memory that was freed makes me somewhat
>>> >> doubtful whether this actually reflects how much memory the process
>>> >> currently holds.  Can you give some more insight there?
>>> >>
>>> >> Apart from that, I just ran your code somewhat modified to make it
>>> >> work without dependencies for 2 hours and saw no unusual memory
>>> >> consumption, just a regular garbage collection sawtooth pattern. That
>>> >> being said, I had to replace your actual processing with a simple
>>> >> println, so if there is a memory leak in there I would of course not
>>> >> have noticed.
>>> >> I've uploaded the code I ran [1] for reference. For further analysis,
>>> >> maybe you could run something similar with just a println or noop and
>>> >> see if the symptoms persist, to localize the leak (if it exists).
>>> >>
>>> >> Also, two random observations on your code:
>>> >>
>>> >> KafkaConsumer.poll(Long timeout) is deprecated, you should consider
>>> >> using the overloaded version with a Duration parameter instead.
>>> >>
>>> >> The comment at [2] seems to contradict the following code, as the
>>> >> offsets are only changed when in suggest mode. But as I have no idea
>>> >> what suggest mode even is or all this means this observation may be
>>> >> miles of point :)
>>> >>
>>> >> I hope that helps a little.
>>> >>
>>> >> Best regards,
>>> >> Sönke
>>> >>
>>> >> [1]
>>> https://gist.github.com/soenkeliebau/e77e8665a1e7e49ade9ec27a6696e983
>>> >> [2]
>>> https://gist.github.com/soenkeliebau/e77e8665a1e7e49ade9ec27a6696e983#file-memoryleak-java-L86
>>> >>
>>> >>
>>> >> On Fri, Mar 1, 2019 at 7:35 AM Syed Mudassir Ahmed
>>> >> <syed.mudas...@gaianconsultants.com> wrote:
>>> >> >
>>> >> >
>>> >> > Thanks,
>>> >> >
>>> >> >
>>> >> >
>>> >> > ---------- Forwarded message ---------
>>> >> > From: Syed Mudassir Ahmed <syed.mudas...@gaianconsultants.com>
>>> >> > Date: Tue, Feb 26, 2019 at 12:40 PM
>>> >> > Subject: Apache Kafka Memory Leakage???
>>> >> > To: <us...@kafka.apache.org>
>>> >> > Cc: Syed Mudassir Ahmed <syed.mudas...@gaianconsultants.com>
>>> >> >
>>> >> >
>>> >> > Hi Team,
>>> >> >   I have a java application based out of latest Apache Kafka
>>> version 2.1.1.
>>> >> >   I have a consumer application that runs infinitely to consume
>>> messages whenever produced.
>>> >> >   Sometimes there are no messages produced for hours.  Still, I see
>>> that the memory allocated to consumer program is drastically increasing.
>>> >> >   My code is as follows:
>>> >> >
>>> >> > AtomicBoolean isRunning = new AtomicBoolean(true);
>>> >> >
>>> >> > Properties kafkaProperties = new Properties();
>>> >> >
>>> >> >
>>> kafkaProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
>>> brokers);
>>> >> >
>>> >> > kafkaProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
>>> >> >
>>> >> > kafkaProperties.put(ConsumerConfig.CLIENT_ID_CONFIG,
>>> UUID.randomUUID().toString());
>>> >> > kafkaProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
>>> false);
>>> >> > kafkaProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
>>> AUTO_OFFSET_RESET_EARLIEST);
>>> >> > consumer = new KafkaConsumer<byte[], byte[]>(kafkaProperties,
>>> keyDeserializer, valueDeserializer);
>>> >> > if (topics != null) {
>>> >> >     subscribeTopics(topics);
>>> >> > }
>>> >> >
>>> >> >
>>> >> >     boolean infiniteLoop = false;
>>> >> >     boolean oneTimeMode = false;
>>> >> >     int timeout = consumeTimeout;
>>> >> >     if (isSuggest) {
>>> >> >         //Configuration for suggest mode
>>> >> >         oneTimeMode = true;
>>> >> >         msgCount = 0;
>>> >> >         timeout = DEFAULT_CONSUME_TIMEOUT_IN_MS;
>>> >> >     } else if (msgCount < 0) {
>>> >> >         infiniteLoop = true;
>>> >> >     } else if (msgCount == 0) {
>>> >> >         oneTimeMode = true;
>>> >> >     }
>>> >> >     Map<TopicPartition, OffsetAndMetadata> offsets =
>>> Maps.newHashMap();
>>> >> >     do {
>>> >> >             ConsumerRecords<byte[], byte[]> records =
>>> consumer.poll(timeout);
>>> >> >             for (final ConsumerRecord<byte[], byte[]> record :
>>> records) {
>>> >> >                 if (!infiniteLoop && !oneTimeMode) {
>>> >> >                     --msgCount;
>>> >> >                     if (msgCount < 0) {
>>> >> >                         break;
>>> >> >                     }
>>> >> >                 }
>>> >> >                 outputViews.write(new BinaryOutput() {
>>> >> >                     @Override
>>> >> >                     public Document getHeader() {
>>> >> >                         return generateHeader(record, oldHeader);
>>> >> >                     }
>>> >> >
>>> >> >                     @Override
>>> >> >                     public void write(WritableByteChannel
>>> writeChannel) throws IOException {
>>> >> >                         try (OutputStream os =
>>> Channels.newOutputStream(writeChannel)) {
>>> >> >                             os.write(record.value());
>>> >> >                         }
>>> >> >                     }
>>> >> >                 });
>>> >> >                 //The offset to commit should be the next offset of
>>> the current one,
>>> >> >                 // according to the API
>>> >> >                 offsets.put(new TopicPartition(record.topic(),
>>> record.partition()),
>>> >> >                         new OffsetAndMetadata(record.offset() + 1));
>>> >> >                 //In suggest mode, we should not change the current
>>> offset
>>> >> >                 if (isSyncCommit && isSuggest) {
>>> >> >                     commitOffset(offsets);
>>> >> >                     offsets.clear();
>>> >> >                 }
>>> >> >             }
>>> >> >      } while ((msgCount > 0 || infiniteLoop) && isRunning.get());
>>> >> >
>>> >> >
>>> >> > See the screenshot below.  In about nineteen hours, it just
>>> consumed 5 messages but the memory allocated is 1.6GB.
>>> >> >
>>> >> >
>>> >> > Any clues on how to get rid of memory issue?  Anything I need to do
>>> in the program or is it a bug in the kafka library?
>>> >> >
>>> >> > Please rever ASAP.
>>> >> >
>>> >> >
>>> >> > Thanks,
>>> >> >
>>> >>
>>> >>
>>> >> --
>>> >> Sönke Liebau
>>> >> Partner
>>> >> Tel. +49 179 7940878
>>> >> OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany
>>>
>>>
>>>
>>> --
>>> Sönke Liebau
>>> Partner
>>> Tel. +49 179 7940878
>>> OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany
>>>
>>
>
> --
> Sönke Liebau
> Partner
> Tel. +49 179 7940878
> OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany
>

Reply via email to