Sonke, thanks again. Yes, I replaced the non-kafka code from our end with a simple Sysout statement as follows:
do { ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.of(timeout, ChronoUnit.MILLIS)); for (final ConsumerRecord<byte[], byte[]> record : records) { if (!infiniteLoop && !oneTimeMode) { --msgCount; if (msgCount < 0) { break; } } Debugger.doPrint("value read:<" + record.value() + ">"); /*outputViews.write(new BinaryOutput() { @Override public Document getHeader() { return generateHeader(record, oldHeader); } @Override public void write(WritableByteChannel writeChannel) throws IOException { try (OutputStream os = Channels.newOutputStream(writeChannel)) { os.write(record.value()); } } });*/ //The offset to commit should be the next offset of the current one, // according to the API offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1)); //In suggest mode, we should not change the current offset if (isSyncCommit && isSuggest) { commitOffset(offsets); offsets.clear(); } } } while ((msgCount > 0 || infiniteLoop) && isRunning.get()); *Note: *Debugger is a wrapper class that just writes the given string to a local file using PrintStream's println() method. And I don't see any diff in the metrics. I still see the huge amount of memory allocated. See the image attached. Thanks, On Mon, Mar 4, 2019 at 5:17 PM Sönke Liebau <soenke.lie...@opencore.com.invalid> wrote: > Hi Syed, > > let's keep it on the list for now so that everybody can participate :) > > The different .poll() method was just an unrelated observation, the > main points of my mail were the question about whether this is the > correct metric you are looking at and replacing the payload of your > code with a println statement to remove non-Kafka code from your > program and make sure that the leak is not in there. Have you tried > that? > > Best regards, > Sönke > > On Mon, Mar 4, 2019 at 7:21 AM Syed Mudassir Ahmed > <syed.mudas...@gaianconsultants.com> wrote: > > > > Sonke, > > Thanks so much for the reply. I used the new version of > poll(Duration) method. Still, I see memory issue. > > Is there a way we can get on a one-one call and discuss this pls? Let > me know your availability. I can share zoom meeting link. > > > > Thanks, > > > > > > > > On Sat, Mar 2, 2019 at 2:15 AM Sönke Liebau > > <soenke.lie...@opencore.com.invalid> > wrote: > >> > >> Hi Syed, > >> > >> from your screenshot I assume that you are using SnapLogic to run your > >> code (full disclosure: I do not have the faintest idea of this > >> product!). I've just had a look at the docs and am a bit confused by > >> their explanation of the metric that you point out in your image > >> "Memory Allocated". The docs say: "The Memory Allocated reflects the > >> number of bytes that were allocated by the Snap. Note that this > >> number does not reflect the amount of memory that was freed and it is > >> not the peak memory usage of the Snap. So, it is not necessarily a > >> metric that can be used to estimate the required size of a Snaplex > >> node. Rather, the number provides an insight into how much memory had > >> to be allocated to process all of the documents. For example, if the > >> total allocated was 5MB and the Snap processed 32 documents, then the > >> Snap allocated roughly 164KB per document. When combined with the > >> other statistics, this number can help to identify the potential > >> causes of performance issues." > >> The part about not reflecting memory that was freed makes me somewhat > >> doubtful whether this actually reflects how much memory the process > >> currently holds. Can you give some more insight there? > >> > >> Apart from that, I just ran your code somewhat modified to make it > >> work without dependencies for 2 hours and saw no unusual memory > >> consumption, just a regular garbage collection sawtooth pattern. That > >> being said, I had to replace your actual processing with a simple > >> println, so if there is a memory leak in there I would of course not > >> have noticed. > >> I've uploaded the code I ran [1] for reference. For further analysis, > >> maybe you could run something similar with just a println or noop and > >> see if the symptoms persist, to localize the leak (if it exists). > >> > >> Also, two random observations on your code: > >> > >> KafkaConsumer.poll(Long timeout) is deprecated, you should consider > >> using the overloaded version with a Duration parameter instead. > >> > >> The comment at [2] seems to contradict the following code, as the > >> offsets are only changed when in suggest mode. But as I have no idea > >> what suggest mode even is or all this means this observation may be > >> miles of point :) > >> > >> I hope that helps a little. > >> > >> Best regards, > >> Sönke > >> > >> [1] > https://gist.github.com/soenkeliebau/e77e8665a1e7e49ade9ec27a6696e983 > >> [2] > https://gist.github.com/soenkeliebau/e77e8665a1e7e49ade9ec27a6696e983#file-memoryleak-java-L86 > >> > >> > >> On Fri, Mar 1, 2019 at 7:35 AM Syed Mudassir Ahmed > >> <syed.mudas...@gaianconsultants.com> wrote: > >> > > >> > > >> > Thanks, > >> > > >> > > >> > > >> > ---------- Forwarded message --------- > >> > From: Syed Mudassir Ahmed <syed.mudas...@gaianconsultants.com> > >> > Date: Tue, Feb 26, 2019 at 12:40 PM > >> > Subject: Apache Kafka Memory Leakage??? > >> > To: <us...@kafka.apache.org> > >> > Cc: Syed Mudassir Ahmed <syed.mudas...@gaianconsultants.com> > >> > > >> > > >> > Hi Team, > >> > I have a java application based out of latest Apache Kafka version > 2.1.1. > >> > I have a consumer application that runs infinitely to consume > messages whenever produced. > >> > Sometimes there are no messages produced for hours. Still, I see > that the memory allocated to consumer program is drastically increasing. > >> > My code is as follows: > >> > > >> > AtomicBoolean isRunning = new AtomicBoolean(true); > >> > > >> > Properties kafkaProperties = new Properties(); > >> > > >> > kafkaProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, > brokers); > >> > > >> > kafkaProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupID); > >> > > >> > kafkaProperties.put(ConsumerConfig.CLIENT_ID_CONFIG, > UUID.randomUUID().toString()); > >> > kafkaProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); > >> > kafkaProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, > AUTO_OFFSET_RESET_EARLIEST); > >> > consumer = new KafkaConsumer<byte[], byte[]>(kafkaProperties, > keyDeserializer, valueDeserializer); > >> > if (topics != null) { > >> > subscribeTopics(topics); > >> > } > >> > > >> > > >> > boolean infiniteLoop = false; > >> > boolean oneTimeMode = false; > >> > int timeout = consumeTimeout; > >> > if (isSuggest) { > >> > //Configuration for suggest mode > >> > oneTimeMode = true; > >> > msgCount = 0; > >> > timeout = DEFAULT_CONSUME_TIMEOUT_IN_MS; > >> > } else if (msgCount < 0) { > >> > infiniteLoop = true; > >> > } else if (msgCount == 0) { > >> > oneTimeMode = true; > >> > } > >> > Map<TopicPartition, OffsetAndMetadata> offsets = > Maps.newHashMap(); > >> > do { > >> > ConsumerRecords<byte[], byte[]> records = > consumer.poll(timeout); > >> > for (final ConsumerRecord<byte[], byte[]> record : > records) { > >> > if (!infiniteLoop && !oneTimeMode) { > >> > --msgCount; > >> > if (msgCount < 0) { > >> > break; > >> > } > >> > } > >> > outputViews.write(new BinaryOutput() { > >> > @Override > >> > public Document getHeader() { > >> > return generateHeader(record, oldHeader); > >> > } > >> > > >> > @Override > >> > public void write(WritableByteChannel > writeChannel) throws IOException { > >> > try (OutputStream os = > Channels.newOutputStream(writeChannel)) { > >> > os.write(record.value()); > >> > } > >> > } > >> > }); > >> > //The offset to commit should be the next offset of > the current one, > >> > // according to the API > >> > offsets.put(new TopicPartition(record.topic(), > record.partition()), > >> > new OffsetAndMetadata(record.offset() + 1)); > >> > //In suggest mode, we should not change the current > offset > >> > if (isSyncCommit && isSuggest) { > >> > commitOffset(offsets); > >> > offsets.clear(); > >> > } > >> > } > >> > } while ((msgCount > 0 || infiniteLoop) && isRunning.get()); > >> > > >> > > >> > See the screenshot below. In about nineteen hours, it just consumed > 5 messages but the memory allocated is 1.6GB. > >> > > >> > > >> > Any clues on how to get rid of memory issue? Anything I need to do > in the program or is it a bug in the kafka library? > >> > > >> > Please rever ASAP. > >> > > >> > > >> > Thanks, > >> > > >> > >> > >> -- > >> Sönke Liebau > >> Partner > >> Tel. +49 179 7940878 > >> OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany > > > > -- > Sönke Liebau > Partner > Tel. +49 179 7940878 > OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany >