Neighbor-Mr-Wang opened a new issue, #14713:
URL: https://github.com/apache/arrow/issues/14713
### Describe the usage question you have. Please include as many useful
details as possible.
I need to upload a large number of small pictures to S3. I use pyarrow and
boto to test, and found that under the same conditions (85625 JPEG pictures, a
total of 9.2GB, 8 processes, boto use time 100s, pyarrow need 500s), pyarrow is
much slower than boto. Boto is also used at the bottom of pyarrow, why is
pyarrow so much slower?
```
from typing import List
from concurrent.futures import ProcessPoolExecutor, as_completed,
ThreadPoolExecutor
from boto3.session import Session
import os
import time
from pyarrow import fs
NUM_WORKERS = 8
class S3Info(object):
ACCESS_KEY = 'xxxx'
SECRET_KEY = 'xxxx'
ENDPOINT = 'http://xxxx'
BUCKET = 'xxx'
def import_data(file_list: List[str]):
worker_pool = ProcessPoolExecutor()
futures, fail_list = [], []
length = len(file_list)
step = int(length / NUM_WORKERS) + 1
for i in range(0, length, step):
sub_fd_list = file_list[i: i + step]
futures.append(worker_pool.submit(put_list_files_by_arrow,
sub_fd_list)) # 8 processes, use 500s
# or: futures.append(worker_pool.submit(put_list_files_by_boto,
sub_fd_list)) 8 processes, use 100s
for future in as_completed(futures):
sub_fail_list = future.result()
fail_list += sub_fail_list
return
def put_list_files_by_boto(file_list: List[str]):
dst_session = Session(aws_access_key_id=S3Info.ACCESS_KEY,
aws_secret_access_key=S3Info.SECRET_KEY)
dst_s3 = dst_session.client("s3", endpoint_url=S3Info.ENDPOINT)
dst_bucket = 'xxx'
file_dir = '/data/test/10GJPEG'
fail_list = []
for file in file_list:
try:
file_path = os.path.join(file_dir, file)
with open(file_path, 'rb') as f:
data = f.read()
dst_key = 'ds_test/put_test_3/'+file
dst_s3.put_object(Body=data, Key=dst_key, Bucket=dst_bucket)
except Exception as exc:
fail_list.append(file)
return fail_list
def put_list_files_by_arrow(file_list: List[str]):
handle = fs.S3FileSystem(
access_key=S3Info.ACCESS_KEY,
secret_key=S3Info.SECRET_KEY,
endpoint_override=S3Info.ENDPOINT)
dst_bucket = 'xxx'
file_dir = '/data/test/10GJPEG'
fail_list = []
for file in file_list:
try:
file_path = os.path.join(file_dir, file)
with open(file_path, 'rb') as f:
data = f.read()
dst_key = 'xxx/ds_test/put_test_3/'+file
f_new = handle.open_output_stream(dst_key)
f_new.write(data)
f_new.close()
except Exception as exc:
fail_list.append(file)
return fail_list
if __name__ == "__main__":
dir = '/data/test/10GJPEG' # 85625 jpeg images, total 9.2GB
files = os.listdir(dir)
print('start .......')
begin = time.time()
import_data(files)
end = time.time()
print(f'{NUM_WORKERS} workers, use time: {end-begin}')
```
### Component
Python
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]