This is an automated email from the ASF dual-hosted git repository.
ianmcook pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-experiments.git
The following commit(s) were added to refs/heads/main by this push:
new 9870ca4 http/python: Simplify server.py and reduce client processing
time in half (#31)
9870ca4 is described below
commit 9870ca4e0902aff7d5af49984c9b4626c097a608
Author: Felipe Oliveira Carvalho <[email protected]>
AuthorDate: Tue Aug 27 16:11:19 2024 -0300
http/python: Simplify server.py and reduce client processing time in half
(#31)
* http/python: Use response instead of full buffer as source of IPC stream
response is a file-like Python object, and as such it's not necessary to
allocate the entire buffer before creating the IPC stream.
This reduced the elapsed time in half while handling a 3GB response
generated by server.py: from 1.108s to 0.638s on my machine.
* http/python: Validate the Content-Type before parsing IPC stream
* http/python: Structure the resolution of Arrow data as it would in an
application
* http/python: Call truncate() on the sink after writing
Which will truncate the BytesIO to the current position.
* http/python: Use generators to simplify chunking of huge buffers
* http/python: Use getbuffer() instead of getvalue()
---
http/get_simple/python/client/client.py | 13 +++---
http/get_simple/python/server/server.py | 71 +++++++++++++++++++--------------
2 files changed, 48 insertions(+), 36 deletions(-)
diff --git a/http/get_simple/python/client/client.py
b/http/get_simple/python/client/client.py
index 069da94..072f406 100644
--- a/http/get_simple/python/client/client.py
+++ b/http/get_simple/python/client/client.py
@@ -19,14 +19,18 @@ import urllib.request
import pyarrow as pa
import time
+ARROW_STREAM_FORMAT = 'application/vnd.apache.arrow.stream'
+
start_time = time.time()
-with urllib.request.urlopen('http://localhost:8008') as response:
- buffer = response.read()
+response = urllib.request.urlopen('http://localhost:8008')
+content_type = response.headers['Content-Type']
+if content_type != ARROW_STREAM_FORMAT:
+ raise ValueError(f"Expected {ARROW_STREAM_FORMAT}, got {content_type}")
batches = []
-with pa.ipc.open_stream(buffer) as reader:
+with pa.ipc.open_stream(response) as reader:
schema = reader.schema
try:
while True:
@@ -35,13 +39,12 @@ with pa.ipc.open_stream(buffer) as reader:
pass
# or:
-#with pa.ipc.open_stream(buffer) as reader:
+#with pa.ipc.open_stream(response) as reader:
# schema = reader.schema
# batches = [b for b in reader]
end_time = time.time()
execution_time = end_time - start_time
-print(f"{len(buffer)} bytes received")
print(f"{len(batches)} record batches received")
print(f"{execution_time} seconds elapsed")
diff --git a/http/get_simple/python/server/server.py
b/http/get_simple/python/server/server.py
index d0388a3..2001430 100644
--- a/http/get_simple/python/server/server.py
+++ b/http/get_simple/python/server/server.py
@@ -56,24 +56,48 @@ def GetPutData():
return batches
-def make_reader(schema, batches):
- return pa.RecordBatchReader.from_batches(schema, batches)
-
-def generate_batches(schema, reader):
+def generate_buffers(schema, source):
with io.BytesIO() as sink, pa.ipc.new_stream(sink, schema) as writer:
- for batch in reader:
+ for batch in source:
sink.seek(0)
- sink.truncate(0)
writer.write_batch(batch)
- yield sink.getvalue()
+ sink.truncate()
+ with sink.getbuffer() as buffer:
+ yield buffer
sink.seek(0)
- sink.truncate(0)
writer.close()
- yield sink.getvalue()
+ sink.truncate()
+ with sink.getbuffer() as buffer:
+ yield buffer
+
+# def chunk_huge_buffer(view, max_chunk_size):
+# if len(view) <= max_chunk_size:
+# yield view
+# return
+# num_splits = len(view) // max_chunk_size
+# for i in range(num_splits):
+# with view[i * max_chunk_size:i * max_chunk_size + max_chunk_size] as
chunk:
+# yield chunk
+# last_chunk_size = len(view) - (num_splits * max_chunk_size)
+# if last_chunk_size > 0:
+# with view[num_splits * max_chunk_size:] as chunk:
+# yield chunk
+
+# def generate_chunked_buffers(schema, source, max_chunk_size):
+# for buffer in generate_buffers(schema, source):
+# with memoryview(buffer) as view:
+# for chunk in chunk_huge_buffer(view, max_chunk_size):
+# yield chunk
class MyServer(BaseHTTPRequestHandler):
+ def resolve_batches(self):
+ return pa.RecordBatchReader.from_batches(schema, batches)
+
def do_GET(self):
+ ### given a source of record batches, this function sends them
+ ### to a client using HTTP chunked transfer encoding.
+ source = self.resolve_batches()
if self.request_version == 'HTTP/1.0':
self.protocol_version = 'HTTP/1.0'
@@ -99,34 +123,19 @@ class MyServer(BaseHTTPRequestHandler):
self.end_headers()
- for buffer in generate_batches(schema, make_reader(schema, batches)):
+ ### if any record batch could be larger than 2 GB, Python's
+ ### http.server will error when calling self.wfile.write(),
+ ### so you will need to split them into smaller chunks by
+ ### using the generate_chunked_buffers() function instead
+ ### if generate_buffers().
+ # for buffer in generate_chunked_buffers(schema, source, int(2e9)):
+ for buffer in generate_buffers(schema, source):
if chunked:
self.wfile.write('{:X}\r\n'.format(len(buffer)).encode('utf-8'))
self.wfile.write(buffer)
if chunked:
self.wfile.write('\r\n'.encode('utf-8'))
self.wfile.flush()
-
- ### if any record batch could be larger than 2 GB, Python's
- ### http.server will error when calling self.wfile.write(),
- ### so you will need to split them into smaller chunks by
- ### replacing the six lines above with this:
- #chunk_size = int(2e9)
- #chunk_splits = len(buffer) // chunk_size
- #for i in range(chunk_splits):
- # if chunked:
- #
self.wfile.write('{:X}\r\n'.format(chunk_size).encode('utf-8'))
- # self.wfile.write(buffer[i * chunk_size:i * chunk_size +
chunk_size])
- # if chunked:
- # self.wfile.write('\r\n'.encode('utf-8'))
- # self.wfile.flush()
- #last_chunk_size = len(buffer) - (chunk_splits * chunk_size)
- #if chunked:
- #
self.wfile.write('{:X}\r\n'.format(last_chunk_size).encode('utf-8'))
- #self.wfile.write(buffer[chunk_splits * chunk_size:])
- #if chunked:
- # self.wfile.write('\r\n'.encode('utf-8'))
- #self.wfile.flush()
if chunked:
self.wfile.write('0\r\n\r\n'.encode('utf-8'))