So to be clear, I added a logger to my BufferedReader. So I know for a fact that it is reading data.
And as for the code, it is a very simple parallel forEach. someStream.parallel().forEach(**work**); I only wanted to change the execution from sequential to parallel. So I have millions and millions of lines being read from the source (1 million lines is several batches alone) and none of it is entering the forEach loop. And I know this because the very first thing my forEach loop does is print logs. As for the iterator, it is literally just a hand rolled iterator (that I copied from StackOverflow) where there is an instance field holding a list of elements, sized to match my batch size. hasNext() prepares the next batch and stores it into that instance field, then returns true/false if there is data to send, then next() returns that instance field's prepared batch. Preparing is just making a new list, and storing a batch size's worth of elements in it. So this means that it is just grabbing batches and batches, but letting any of them through into the forEach loop. And when I turn off parallelism, they suddenly are going through just fine, one after the other. On Sat, Oct 19, 2024, 7:12 AM Olexandr Rotan <rotanolexandr...@gmail.com> wrote: > Hi David. I am not a core libs team but I guess I can have some clues :). > > It is hard to tell without the code, but I assume that there are a few > layers to it. > > 1. Stalling. I would assume it is caused mostly by GC pauses taking too > long (forever) if GC does not have any computational powers to run on. > There is a fairly common GC-pauses related issue when database connection > interrupts with exception saying "Broken pipe", which under the hood is > caused by timeout of connection to database due to long GC pause when > running on low memory. I am not saying this is your case, but If I were to > guess I would assume that stall is caused by low memory. > > 2. Out of memory root cause may be too much splitting of your data source > input. You may try to limit it by modifying the behaviour of trySplit > method of your spliterator. > > Alternatively, If you don't mind taking up some disk space, you can try to > stream data into file, save it, and then use memory-mapped buffers > (java.nio.MappedByteBuffer) to process accepted data. I am not sure this > will work, but memory-mapped files is a common tool to deal with operations > that cant fit into RAM. > > Regards > > > On Sat, Oct 19, 2024 at 8:54 AM David Alayachew <davidalayac...@gmail.com> > wrote: > >> Hello Core Libs Dev Team, >> >> I have a file that I am streaming from a service, and I am trying to >> split into multiple parts based on a certain attribute found on each line. >> I am sending each part up to a different service. >> >> I am using BufferedReader.lines(). However, I cannot read the whole file >> into memory because it is larger than the amount of RAM that I have on the >> machine. So, since I don't have access to Java 22's Preview Gatherers Fixed >> Window, I used the iterator() method on my stream, wrapped that in another >> iterator that can grab my batch size worth of data, then built a >> spliterator from that that I then used to create a new stream. In short, >> this wrapper iterator isn't Iterator<T>, it's Iterator<List<T>>. >> >> When I ran this sequentially, everything worked well. However, my CPU was >> low and we definitely have a performance problem -- our team needs this >> number as fast as we can get. Plus, we had plenty of network bandwidth to >> spare, so I had (imo) good reason to go use parallelism. >> >> As soon as I turned on parallelism, the stream's behaviour changed >> completely. Instead of fetching the batch and processing, it started >> grabbing SEVERAL BATCHES and processing NONE OF THEM. Or at the very least, >> it grabbed so many batches that it ran out of memory before it could get to >> processing them. >> >> To give some numbers, this is a 4 core machine. And we can safely hold >> about 30-40 batches worth of data in memory before crashing. But again, >> when running sequentially, this thing only grabs 1 batch, processes that >> one batch, sends out the results, and then start the next one, all as >> expected. I thought that adding parallelism would simply make it so that we >> have this happening 4 or 8 times at once. >> >> After a very long period of digging, I managed to find this link. >> >> >> https://stackoverflow.com/questions/30825708/java-8-using-parallel-in-a-stream-causes-oom-error >> >> Tagir Valeev gives an answer which doesn't go very deep into the "why" at >> all. And the answer is more directed to the user's specific question as >> opposed to solving this particular problem. >> >> After digging through a bunch of other solutions (plus my own testing), >> it seems that the answer is that the engine that does parallelization for >> Streams tries to grab a large enough "buffer" before doing any parallel >> processing. I could be wrong, and how large that buffer is? I have no idea. >> >> Regardless, that's about where I gave up and went sequential, since the >> clock was ticking. >> >> But I still have a performance problem. How would one suggest going about >> this in Java 8? >> >> Thank you for your time and help. >> David Alayachew >> >>