samredai edited a comment on pull request #3691:
URL: https://github.com/apache/iceberg/pull/3691#issuecomment-1005059114


   As an example of what an implementation of these base classes would look 
like, I put together an S3FileIO implementation using smart_open to create 
seekable file-like objects (`smart_open.s3.Reader` instances) and validated 
that this can be fed directly into pyarrow. I also validated that 
`smart_open.s3.MultipartWriter` instances work as the `where` argument to 
pyarrow's `write_table` methods.
   ### Implementation, `s3.py`
   ```py
   from iceberg.io.base import FileIO, InputFile, OutputFile
   from smart_open import open, parse_uri
   import boto3
   
   class S3InputFile(InputFile):
   
       def __len__(self) -> int:
           return 0
       
       @property
       def exists(self) -> bool:
           try:
               with open(self.location, 'rb') as f:
                   pass
           except OSError:
               return False
           return True
       
       def __enter__(self):
           self._stream = open(self.location, 'rb', 
transport_params={"defer_seek": True})
           return self._stream
       
       def __exit__(self, exc_type, exc_value, exc_traceback):
           self._stream.close()
           return
   
   
   class S3OutputFile(OutputFile):
   
       def __call__(self, overwrite: bool = False, **kwargs):
           self._overwrite = overwrite
           return self
   
       def __len__(self) -> int:
           return 0
       
       @property
       def location(self) -> str:
           """The fully-qualified location of the output file"""
           return self._location
   
       @property
       def exists(self) -> bool:
           try:
               with open(self.location, 'rb') as f:
                   pass
           except OSError:
               return False
           return True
       
       def to_input_file(self) -> S3InputFile:
           return S3InputFile(self.location)
   
       def __enter__(self):
           if not self._overwrite and self.exists:
               raise FileExistsError(
                   f"{self.location} already exists. To overwrite, "
                   "set overwrite=True when initializing the S3OutputFile."
               )
               
           self._stream = open(self.location, 'wb')
           return self._stream
       
       def __exit__(self, exc_type, exc_value, exc_traceback):
           self._stream.close()
           return
   
   class S3FileIO(FileIO):
       def new_input(self, location: str):
           return S3InputFile(location=location)
   
       def new_output(self, location: str, overwrite: bool = False):
           return S3OutputFile(location=location, overwrite=overwrite)
   
       def delete(self, location: str):
           uri = parse_uri(location)
           s3 = boto3.resource('s3')
           s3.Object(uri.bucket_id, uri.key_id).delete()
           return
   ```
   ### `example.py`
   ```py
   from pyarrow import parquet as pq
   from s3 import S3FileIO
   
   f1 = "s3://samstestbucket3412/userdata1.parquet"
   f2 = "s3://samstestbucket3412/userdata2.parquet"
   file_io = S3FileIO()
   
   # Read f1 (a parquet file)
   with file_io.new_input(f1) as f:
       table = pq.read_table(f)
   
   # see output below
   print("####################\n")
   print(type(table), end="\n\n")
   print(table, end="\n\n")
   print("####################\n")
   
   # Delete f2 if it exists
   file_io.delete(f2)
   
   # Write the pyarrow table to f2
   with file_io.new_output(f2) as f:
       pq.write_table(table, f)
   
   # Read the newly written table back in
   with file_io.new_input(f2) as f:
       table2 = pq.read_table(f)
   
   # see output below
   print("####################\n")
   print(type(table), end="\n\n")
   print(table, end="\n\n")
   print("####################\n")
   
   
   # Try to write to f2 again without overwrite=True
   # with file_io.new_output(f2) as f:
   #     pq.write_table(table, f)  # Raises a FileExistsError
   
   # Writing f2 again with overwrite=True
   with file_io.new_output(f2, overwrite=True) as f:
       pq.write_table(table2, f)
   
   # Delete f2
   file_io.delete(f2)
   
   # Write f2 without setting overwrite since it's been deleted
   with file_io.new_output(f2) as f:
       pq.write_table(table2, f)
   ```
   output:
   ```
   ####################
   
   <class 'pyarrow.lib.Table'>
   
   pyarrow.Table
   registration_dttm: timestamp[ns]
   id: int32
   first_name: string
   last_name: string
   email: string
   gender: string
   ip_address: string
   cc: string
   country: string
   birthdate: string
   salary: double
   title: string
   comments: string
   
   ####################
   
   ####################
   
   <class 'pyarrow.lib.Table'>
   
   pyarrow.Table
   registration_dttm: timestamp[ns]
   id: int32
   first_name: string
   last_name: string
   email: string
   gender: string
   ip_address: string
   cc: string
   country: string
   birthdate: string
   salary: double
   title: string
   comments: string
   
   ####################
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to