samredai edited a comment on pull request #3691:
URL: https://github.com/apache/iceberg/pull/3691#issuecomment-1005059114
As an example of what an implementation of these base classes would look
like, I put together an S3FileIO implementation using smart_open to create
seekable file-like objects (`smart_open.s3.Reader` instances) and validated
that this can be fed directly into pyarrow. I also validated that
`smart_open.s3.MultipartWriter` instances work as the `where` argument to
pyarrow's `write_table` methods.
### Implementation, `s3.py`
```py
from iceberg.io.base import FileIO, InputFile, OutputFile
from smart_open import open, parse_uri
import boto3
class S3InputFile(InputFile):
def __len__(self) -> int:
return 0
@property
def exists(self) -> bool:
try:
with open(self.location, 'rb') as f:
pass
except OSError:
return False
return True
def __enter__(self):
self._stream = open(self.location, 'rb',
transport_params={"defer_seek": True})
return self._stream
def __exit__(self, exc_type, exc_value, exc_traceback):
self._stream.close()
return
class S3OutputFile(OutputFile):
def __call__(self, overwrite: bool = False, **kwargs):
self._overwrite = overwrite
return self
def __len__(self) -> int:
return 0
@property
def location(self) -> str:
"""The fully-qualified location of the output file"""
return self._location
@property
def exists(self) -> bool:
try:
with open(self.location, 'rb') as f:
pass
except OSError:
return False
return True
def to_input_file(self) -> S3InputFile:
return S3InputFile(self.location)
def __enter__(self):
if not self._overwrite and self.exists:
raise FileExistsError(
f"{self.location} already exists. To overwrite, "
"set overwrite=True when initializing the S3OutputFile."
)
self._stream = open(self.location, 'wb')
return self._stream
def __exit__(self, exc_type, exc_value, exc_traceback):
self._stream.close()
return
class S3FileIO(FileIO):
def new_input(self, location: str):
return S3InputFile(location=location)
def new_output(self, location: str, overwrite: bool = False):
return S3OutputFile(location=location, overwrite=overwrite)
def delete(self, location: str):
uri = parse_uri(location)
s3 = boto3.resource('s3')
s3.Object(uri.bucket_id, uri.key_id).delete()
return
```
### `example.py`
```py
from pyarrow import parquet as pq
from s3 import S3FileIO
f1 = "s3://samstestbucket3412/userdata1.parquet"
f2 = "s3://samstestbucket3412/userdata2.parquet"
file_io = S3FileIO()
# Read f1 (a parquet file)
with file_io.new_input(f1) as f:
table = pq.read_table(f)
# see output below
print("####################\n")
print(type(table), end="\n\n")
print(table, end="\n\n")
print("####################\n")
# Delete f2 if it exists
file_io.delete(f2)
# Write the pyarrow table to f2
with file_io.new_output(f2) as f:
pq.write_table(table, f)
# Read the newly written table back in
with file_io.new_input(f2) as f:
table2 = pq.read_table(f)
# see output below
print("####################\n")
print(type(table), end="\n\n")
print(table, end="\n\n")
print("####################\n")
# Try to write to f2 again without overwrite=True
# with file_io.new_output(f2) as f:
# pq.write_table(table, f) # Raises a FileExistsError
# Writing f2 again with overwrite=True
with file_io.new_output(f2, overwrite=True) as f:
pq.write_table(table2, f)
# Delete f2
file_io.delete(f2)
# Write f2 without setting overwrite since it's been deleted
with file_io.new_output(f2) as f:
pq.write_table(table2, f)
```
output:
```
####################
<class 'pyarrow.lib.Table'>
pyarrow.Table
registration_dttm: timestamp[ns]
id: int32
first_name: string
last_name: string
email: string
gender: string
ip_address: string
cc: string
country: string
birthdate: string
salary: double
title: string
comments: string
####################
####################
<class 'pyarrow.lib.Table'>
pyarrow.Table
registration_dttm: timestamp[ns]
id: int32
first_name: string
last_name: string
email: string
gender: string
ip_address: string
cc: string
country: string
birthdate: string
salary: double
title: string
comments: string
####################
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]