Sonke, This issue seems serious. Customers raised bug with our product. And I suspect the bug is in apache-kafka clients library. I executed the kafka reader without any snaplogic-specific code. There were hardly about twenty messages in the topics. The code consumed about 300MB of memory in about 2 hours. Please find attached the screenshot. Can we pls get on a call and arrive at the conclusion? I still argue its a bug in the kafka-clients library.
Thanks, On Mon, Mar 4, 2019 at 8:33 PM Sönke Liebau <soenke.lie...@opencore.com.invalid> wrote: > Hi Syed, > > and you are sure that this memory is actually allocated? I still have my > reservations about that metric to be honest. Is there any way to connect to > the process with for example jconsole and having a look at memory > consumption in there? > Or alternatively, since the code you have sent is not relying on SnapLogic > anymore, can you just run it as a standalone application and check memory > consumption? > > That code looks very similar to what I ran (without knowing your input > parameters for issuggest et. al of course) and for me memory consumption > stayed between 120mb and 200mb. > > Best regards, > Sönke > > > On Mon, Mar 4, 2019 at 1:44 PM Syed Mudassir Ahmed < > syed.mudas...@gaianconsultants.com> wrote: > >> Sonke, >> thanks again. >> Yes, I replaced the non-kafka code from our end with a simple Sysout >> statement as follows: >> >> do { >> ConsumerRecords<byte[], byte[]> records = >> consumer.poll(Duration.of(timeout, ChronoUnit.MILLIS)); >> for (final ConsumerRecord<byte[], byte[]> record : records) { >> if (!infiniteLoop && !oneTimeMode) { >> --msgCount; >> if (msgCount < 0) { >> break; >> } >> } >> Debugger.doPrint("value read:<" + record.value() + ">"); >> /*outputViews.write(new BinaryOutput() { >> @Override >> public Document getHeader() { >> return generateHeader(record, oldHeader); >> } >> >> @Override >> public void write(WritableByteChannel writeChannel) throws >> IOException { >> try (OutputStream os = >> Channels.newOutputStream(writeChannel)) { >> os.write(record.value()); >> } >> } >> });*/ >> //The offset to commit should be the next offset of the current one, >> // according to the API >> offsets.put(new TopicPartition(record.topic(), record.partition()), >> new OffsetAndMetadata(record.offset() + 1)); >> //In suggest mode, we should not change the current offset >> if (isSyncCommit && isSuggest) { >> commitOffset(offsets); >> offsets.clear(); >> } >> } >> } while ((msgCount > 0 || infiniteLoop) && isRunning.get()); >> >> >> *Note: *Debugger is a wrapper class that just writes the given string to a >> local file using PrintStream's println() method. >> >> And I don't see any diff in the metrics. I still see the huge amount of >> memory allocated. >> >> See the image attached. >> >> >> Thanks, >> >> >> >> On Mon, Mar 4, 2019 at 5:17 PM Sönke Liebau >> <soenke.lie...@opencore.com.invalid> wrote: >> >>> Hi Syed, >>> >>> let's keep it on the list for now so that everybody can participate :) >>> >>> The different .poll() method was just an unrelated observation, the >>> main points of my mail were the question about whether this is the >>> correct metric you are looking at and replacing the payload of your >>> code with a println statement to remove non-Kafka code from your >>> program and make sure that the leak is not in there. Have you tried >>> that? >>> >>> Best regards, >>> Sönke >>> >>> On Mon, Mar 4, 2019 at 7:21 AM Syed Mudassir Ahmed >>> <syed.mudas...@gaianconsultants.com> wrote: >>> > >>> > Sonke, >>> > Thanks so much for the reply. I used the new version of >>> poll(Duration) method. Still, I see memory issue. >>> > Is there a way we can get on a one-one call and discuss this pls? >>> Let me know your availability. I can share zoom meeting link. >>> > >>> > Thanks, >>> > >>> > >>> > >>> > On Sat, Mar 2, 2019 at 2:15 AM Sönke Liebau < >>> soenke.lie...@opencore.com.invalid> wrote: >>> >> >>> >> Hi Syed, >>> >> >>> >> from your screenshot I assume that you are using SnapLogic to run your >>> >> code (full disclosure: I do not have the faintest idea of this >>> >> product!). I've just had a look at the docs and am a bit confused by >>> >> their explanation of the metric that you point out in your image >>> >> "Memory Allocated". The docs say: "The Memory Allocated reflects the >>> >> number of bytes that were allocated by the Snap. Note that this >>> >> number does not reflect the amount of memory that was freed and it is >>> >> not the peak memory usage of the Snap. So, it is not necessarily a >>> >> metric that can be used to estimate the required size of a Snaplex >>> >> node. Rather, the number provides an insight into how much memory had >>> >> to be allocated to process all of the documents. For example, if the >>> >> total allocated was 5MB and the Snap processed 32 documents, then the >>> >> Snap allocated roughly 164KB per document. When combined with the >>> >> other statistics, this number can help to identify the potential >>> >> causes of performance issues." >>> >> The part about not reflecting memory that was freed makes me somewhat >>> >> doubtful whether this actually reflects how much memory the process >>> >> currently holds. Can you give some more insight there? >>> >> >>> >> Apart from that, I just ran your code somewhat modified to make it >>> >> work without dependencies for 2 hours and saw no unusual memory >>> >> consumption, just a regular garbage collection sawtooth pattern. That >>> >> being said, I had to replace your actual processing with a simple >>> >> println, so if there is a memory leak in there I would of course not >>> >> have noticed. >>> >> I've uploaded the code I ran [1] for reference. For further analysis, >>> >> maybe you could run something similar with just a println or noop and >>> >> see if the symptoms persist, to localize the leak (if it exists). >>> >> >>> >> Also, two random observations on your code: >>> >> >>> >> KafkaConsumer.poll(Long timeout) is deprecated, you should consider >>> >> using the overloaded version with a Duration parameter instead. >>> >> >>> >> The comment at [2] seems to contradict the following code, as the >>> >> offsets are only changed when in suggest mode. But as I have no idea >>> >> what suggest mode even is or all this means this observation may be >>> >> miles of point :) >>> >> >>> >> I hope that helps a little. >>> >> >>> >> Best regards, >>> >> Sönke >>> >> >>> >> [1] >>> https://gist.github.com/soenkeliebau/e77e8665a1e7e49ade9ec27a6696e983 >>> >> [2] >>> https://gist.github.com/soenkeliebau/e77e8665a1e7e49ade9ec27a6696e983#file-memoryleak-java-L86 >>> >> >>> >> >>> >> On Fri, Mar 1, 2019 at 7:35 AM Syed Mudassir Ahmed >>> >> <syed.mudas...@gaianconsultants.com> wrote: >>> >> > >>> >> > >>> >> > Thanks, >>> >> > >>> >> > >>> >> > >>> >> > ---------- Forwarded message --------- >>> >> > From: Syed Mudassir Ahmed <syed.mudas...@gaianconsultants.com> >>> >> > Date: Tue, Feb 26, 2019 at 12:40 PM >>> >> > Subject: Apache Kafka Memory Leakage??? >>> >> > To: <us...@kafka.apache.org> >>> >> > Cc: Syed Mudassir Ahmed <syed.mudas...@gaianconsultants.com> >>> >> > >>> >> > >>> >> > Hi Team, >>> >> > I have a java application based out of latest Apache Kafka >>> version 2.1.1. >>> >> > I have a consumer application that runs infinitely to consume >>> messages whenever produced. >>> >> > Sometimes there are no messages produced for hours. Still, I see >>> that the memory allocated to consumer program is drastically increasing. >>> >> > My code is as follows: >>> >> > >>> >> > AtomicBoolean isRunning = new AtomicBoolean(true); >>> >> > >>> >> > Properties kafkaProperties = new Properties(); >>> >> > >>> >> > >>> kafkaProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, >>> brokers); >>> >> > >>> >> > kafkaProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupID); >>> >> > >>> >> > kafkaProperties.put(ConsumerConfig.CLIENT_ID_CONFIG, >>> UUID.randomUUID().toString()); >>> >> > kafkaProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, >>> false); >>> >> > kafkaProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, >>> AUTO_OFFSET_RESET_EARLIEST); >>> >> > consumer = new KafkaConsumer<byte[], byte[]>(kafkaProperties, >>> keyDeserializer, valueDeserializer); >>> >> > if (topics != null) { >>> >> > subscribeTopics(topics); >>> >> > } >>> >> > >>> >> > >>> >> > boolean infiniteLoop = false; >>> >> > boolean oneTimeMode = false; >>> >> > int timeout = consumeTimeout; >>> >> > if (isSuggest) { >>> >> > //Configuration for suggest mode >>> >> > oneTimeMode = true; >>> >> > msgCount = 0; >>> >> > timeout = DEFAULT_CONSUME_TIMEOUT_IN_MS; >>> >> > } else if (msgCount < 0) { >>> >> > infiniteLoop = true; >>> >> > } else if (msgCount == 0) { >>> >> > oneTimeMode = true; >>> >> > } >>> >> > Map<TopicPartition, OffsetAndMetadata> offsets = >>> Maps.newHashMap(); >>> >> > do { >>> >> > ConsumerRecords<byte[], byte[]> records = >>> consumer.poll(timeout); >>> >> > for (final ConsumerRecord<byte[], byte[]> record : >>> records) { >>> >> > if (!infiniteLoop && !oneTimeMode) { >>> >> > --msgCount; >>> >> > if (msgCount < 0) { >>> >> > break; >>> >> > } >>> >> > } >>> >> > outputViews.write(new BinaryOutput() { >>> >> > @Override >>> >> > public Document getHeader() { >>> >> > return generateHeader(record, oldHeader); >>> >> > } >>> >> > >>> >> > @Override >>> >> > public void write(WritableByteChannel >>> writeChannel) throws IOException { >>> >> > try (OutputStream os = >>> Channels.newOutputStream(writeChannel)) { >>> >> > os.write(record.value()); >>> >> > } >>> >> > } >>> >> > }); >>> >> > //The offset to commit should be the next offset of >>> the current one, >>> >> > // according to the API >>> >> > offsets.put(new TopicPartition(record.topic(), >>> record.partition()), >>> >> > new OffsetAndMetadata(record.offset() + 1)); >>> >> > //In suggest mode, we should not change the current >>> offset >>> >> > if (isSyncCommit && isSuggest) { >>> >> > commitOffset(offsets); >>> >> > offsets.clear(); >>> >> > } >>> >> > } >>> >> > } while ((msgCount > 0 || infiniteLoop) && isRunning.get()); >>> >> > >>> >> > >>> >> > See the screenshot below. In about nineteen hours, it just >>> consumed 5 messages but the memory allocated is 1.6GB. >>> >> > >>> >> > >>> >> > Any clues on how to get rid of memory issue? Anything I need to do >>> in the program or is it a bug in the kafka library? >>> >> > >>> >> > Please rever ASAP. >>> >> > >>> >> > >>> >> > Thanks, >>> >> > >>> >> >>> >> >>> >> -- >>> >> Sönke Liebau >>> >> Partner >>> >> Tel. +49 179 7940878 >>> >> OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany >>> >>> >>> >>> -- >>> Sönke Liebau >>> Partner >>> Tel. +49 179 7940878 >>> OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany >>> >> > > -- > Sönke Liebau > Partner > Tel. +49 179 7940878 > OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany >