unReaLaneS commented on issue #37139:
URL: https://github.com/apache/arrow/issues/37139#issuecomment-1713694291

   @mapleFU Hi, sorry for taking too long. I manage to increase the RAM and it 
works now, but still the RAM usage is somewhat linear, so the parquet files are 
read from the S3, so I solved it by increasing the RAM.
   
   But what I noticed regarding `iter_batches` and `to_batches` is they behave 
differently on different OS environments.
   
   By the documentation both `iter_batches` and `to_batches` should "yield the 
batches, but the to_batches also has read ahead buffer mechanism, I get it.
   
   **macOS**:
   I was checking memory usage directly in the Activity Monitor. It seems that 
`iter_batches` behaves like it should behave and for some time memory usage is 
normal, but overtime it seems to slightly increase, but with `to_batches`, it 
prebuffers entire parquet file, there is no limitation, if it is 1GB parquet 
file, it will load it into 16 GB of RAM and then it start doing memory swap.
   
   **Linux**
   On the other hand `iter_batches` seems to load entire file and it 
immediatelly fails due to RAM usage, but on the other hand `to_batches` is 
loading in batches, even with prebuffering and memory usage is oK!, but 
overtime it is increasing until the container is killed by AWS.
   
   Here is the code:
   
   ```
       def get_all_s3_files(self):
           self.logger.info("Bucket name: " + os.environ.get('RP_BUCKET_NAME'))
           self.logger.info("Bucket resource path: " + 
os.environ.get('RP_BUCKET_RESOURCE_PATH'))
   
           self.logger.info("Return list of s3 parquet files")
   
           for obj in 
self.my_bucket.objects.filter(Prefix=os.environ.get('RP_BUCKET_RESOURCE_PATH')):
               if obj.key.endswith('parquet'):
                   yield obj
   
   
   
       def read_from_s3(self, resource_path, 
batch_size=int(os.environ.get('READ_BULK_SIZE_FROM_PARQUET'))):
   
           self.logger.info(f"Bucket name: {resource_path.bucket_name}")
           self.logger.info(f"Bucket full path: 
{resource_path.bucket_name}/{resource_path.key}")
           self.logger.info(f"Loading from s3 file by batch size: 
{os.environ.get('READ_BULK_SIZE_FROM_PARQUET')}")
   
           for batch in 
pq.read_table(f"{resource_path.bucket_name}/{resource_path.key}", 
filesystem=self.s3_file_system)\
                   .to_batches(batch_size):
               yield json.loads(batch.to_pandas().to_json(orient="records"))
               del batch
               gc.collect()
   ```
   
   I am reading files from S3 in the loop, so it is read file by file.
   
   ```
               for item in s3_client.get_all_s3_files():
   
                   self.logger.info(f'Getting chunks from new file with key 
{item.key}')
   
                   response["no_of_parquet_files"] = 
response["no_of_parquet_files"] + 1
   
                   start_time = time.time()
   
                   for batch in s3_client.read_from_s3(item):
   ```
   Here is the code how it is being called. I thought also that It might be the 
issue with Python and Linux not releasing the memory, but it happens also in 
the macOS. That issue #37630 might be also the problem, I was also checking 
that It might be the problem with to_pandas, but there is no other way to 
transfer data to json, only like this, cause if any other approach is used, 
data is not being parsed correctly and totally wrong data is returned.
   
   Thanks again, @mapleFU 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to