yossibm opened a new issue, #13949:
URL: https://github.com/apache/arrow/issues/13949

   Reading 20 uncompressed parquet files with total size 3.2GB, takes more then 
12GB in RAM, when reading them "concurrently".
   
   "concurrently" means that I need to read the second file before closing the 
first file, not multithreading. 
   
   The data is time series, so my program needs to read all the files up to 
some time, and then proceed. 
   
   I expect Arrow to use the amount of memory that corresponds to a single 
batch multiplied by the amount of files, but in reality the memory used is much 
more then the entire files.
   
   The files were created with pandas default config (using pyarrow), and 
reading them in java gives the correct values. 
   
   when reading each file to the fullest, and then closing the file, the amount 
of ram used is ok.
   
   I have tried to switch between the netty, and unsafe memory jars but they 
have the same results.
   
   `-Darrow.memory.debug.allocator=true` did not produce any error.
   
   trying to limit the amount of direct memory (the excess memory is outside of 
the JVM) I have tried to replace `NativeMemoryPool.getDefault()` with 
   `NativeMemoryPool.createListenable(DirectReservationListener.instance())` or 
`NativeMemoryPool.createListenable(.. some custom listener ..)` 
   
   but the result is exception:
   ```
   Exception in thread "main" java.lang.RuntimeException: JNIEnv was not 
attached to current thread
        at org.apache.arrow.dataset.jni.JniWrapper.nextRecordBatch(Native 
Method)
        at 
org.apache.arrow.dataset.jni.NativeScanner$NativeReader.loadNextBatch(NativeScanner.java:134)
        at ParquetExample.main(ParquetExample.java:47)
   ```
   using `-XX:MaxDirectMemorySize=1g`, `-Xmx4g` anyways had no effect.
   
   the runtime is using env varibale:
   `_JAVA_OPTIONS="--add-opens=java.base/java.nio=ALL-UNNAMED"`
   on JDK 17.0.2 with arrow 9.0.0
   
   the code is extracted to this simple example, taken from the official 
documentation:
   
   ```
   import org.apache.arrow.dataset.file.FileFormat;
   import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
   import org.apache.arrow.dataset.jni.NativeMemoryPool;
   import org.apache.arrow.dataset.scanner.ScanOptions;
   import org.apache.arrow.dataset.scanner.Scanner;
   import org.apache.arrow.dataset.source.Dataset;
   import org.apache.arrow.dataset.source.DatasetFactory;
   import org.apache.arrow.memory.BufferAllocator;
   import org.apache.arrow.memory.RootAllocator;
   import org.apache.arrow.vector.VectorSchemaRoot;
   import org.apache.arrow.vector.ipc.ArrowReader;
   
   import java.io.IOException;
   import java.nio.file.Files;
   import java.nio.file.Path;
   import java.util.ArrayList;
   import java.util.List;
   
   public class ParquetExample {
   
       static BufferAllocator allocator = new RootAllocator(128 * 1024 * 1024); 
// limit does not affect problem
   
       public static ArrowReader read_parquet_file(Path filePath, 
NativeMemoryPool nativeMemoryPool) {
           String uri = "file:" + filePath;
           ScanOptions options = new ScanOptions(/*batchSize*/ 64 * 1024 * 
1024);
           try (
                   DatasetFactory datasetFactory = new FileSystemDatasetFactory(
                           allocator, nativeMemoryPool, FileFormat.PARQUET, 
uri);
                   Dataset dataset = datasetFactory.finish()
           ) {
               Scanner scanner = dataset.newScan(options);
               return  scanner.scan().iterator().next().execute();
           } catch (Exception e) {
               throw new RuntimeException(e);
           }
       }
   
       public static void main(String[] args) throws IOException {
           List<VectorSchemaRoot> schemaRoots = new ArrayList<>();
           for (Path filePath : [...] ) { // 20 files, total uncompressed size 
3.2GB
               ArrowReader arrowReader = read_parquet_file(file,
                       NativeMemoryPool.getDefault());
               if (arrowReader.loadNextBatch()) { // single batch read
                   schemaRoots.add(arrowReader.getVectorSchemaRoot());
               }
           }
   
       }
   }
   ```
   the question is - why Arrow using so much memory in a straight-forward 
example, and why replacing the NativeMemoryPool results in crash?
   
   I guess that the excessive memory is because of extracting the dictionary, 
and that the JNI part of the code is extracting the files fully. maybe this 
would be solved if the NativeMemoryPool part was working?
   
   Thanks
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to