This is an automated email from the ASF dual-hosted git repository.

ianmcook pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-experiments.git


The following commit(s) were added to refs/heads/main by this push:
     new 9870ca4  http/python: Simplify server.py and reduce client processing 
time in half (#31)
9870ca4 is described below

commit 9870ca4e0902aff7d5af49984c9b4626c097a608
Author: Felipe Oliveira Carvalho <[email protected]>
AuthorDate: Tue Aug 27 16:11:19 2024 -0300

    http/python: Simplify server.py and reduce client processing time in half 
(#31)
    
    * http/python: Use response instead of full buffer as source of IPC stream
    
    response is a file-like Python object, and as such it's not necessary to
    allocate the entire buffer before creating the IPC stream.
    
    This reduced the elapsed time in half while handling a 3GB response
    generated by server.py: from 1.108s to 0.638s on my machine.
    
    * http/python: Validate the Content-Type before parsing IPC stream
    
    * http/python: Structure the resolution of Arrow data as it would in an 
application
    
    * http/python: Call truncate() on the sink after writing
    
    Which will truncate the BytesIO to the current position.
    
    * http/python: Use generators to simplify chunking of huge buffers
    
    * http/python: Use getbuffer() instead of getvalue()
---
 http/get_simple/python/client/client.py | 13 +++---
 http/get_simple/python/server/server.py | 71 +++++++++++++++++++--------------
 2 files changed, 48 insertions(+), 36 deletions(-)

diff --git a/http/get_simple/python/client/client.py 
b/http/get_simple/python/client/client.py
index 069da94..072f406 100644
--- a/http/get_simple/python/client/client.py
+++ b/http/get_simple/python/client/client.py
@@ -19,14 +19,18 @@ import urllib.request
 import pyarrow as pa
 import time
 
+ARROW_STREAM_FORMAT = 'application/vnd.apache.arrow.stream'
+
 start_time = time.time()
 
-with urllib.request.urlopen('http://localhost:8008') as response:
-  buffer = response.read()
+response = urllib.request.urlopen('http://localhost:8008')
+content_type = response.headers['Content-Type']
+if content_type != ARROW_STREAM_FORMAT:
+  raise ValueError(f"Expected {ARROW_STREAM_FORMAT}, got {content_type}")
 
 batches = []
 
-with pa.ipc.open_stream(buffer) as reader:
+with pa.ipc.open_stream(response) as reader:
   schema = reader.schema
   try:
     while True:
@@ -35,13 +39,12 @@ with pa.ipc.open_stream(buffer) as reader:
       pass
 
 # or:
-#with pa.ipc.open_stream(buffer) as reader:
+#with pa.ipc.open_stream(response) as reader:
 #  schema = reader.schema
 #  batches = [b for b in reader]
 
 end_time = time.time()
 execution_time = end_time - start_time
 
-print(f"{len(buffer)} bytes received")
 print(f"{len(batches)} record batches received")
 print(f"{execution_time} seconds elapsed")
diff --git a/http/get_simple/python/server/server.py 
b/http/get_simple/python/server/server.py
index d0388a3..2001430 100644
--- a/http/get_simple/python/server/server.py
+++ b/http/get_simple/python/server/server.py
@@ -56,24 +56,48 @@ def GetPutData():
     
     return batches
 
-def make_reader(schema, batches):
-    return pa.RecordBatchReader.from_batches(schema, batches)
-
-def generate_batches(schema, reader):
+def generate_buffers(schema, source):
     with io.BytesIO() as sink, pa.ipc.new_stream(sink, schema) as writer:
-        for batch in reader:
+        for batch in source:
             sink.seek(0)
-            sink.truncate(0)
             writer.write_batch(batch)
-            yield sink.getvalue()
+            sink.truncate()
+            with sink.getbuffer() as buffer:
+                yield buffer
         
         sink.seek(0)
-        sink.truncate(0)
         writer.close()
-        yield sink.getvalue()
+        sink.truncate()
+        with sink.getbuffer() as buffer:
+            yield buffer
+
+# def chunk_huge_buffer(view, max_chunk_size):
+#     if len(view) <= max_chunk_size:
+#         yield view
+#         return
+#     num_splits = len(view) // max_chunk_size
+#     for i in range(num_splits):
+#         with view[i * max_chunk_size:i * max_chunk_size + max_chunk_size] as 
chunk:
+#             yield chunk
+#     last_chunk_size = len(view) - (num_splits * max_chunk_size)
+#     if last_chunk_size > 0:
+#         with view[num_splits * max_chunk_size:] as chunk:
+#             yield chunk
+
+# def generate_chunked_buffers(schema, source, max_chunk_size):
+#     for buffer in generate_buffers(schema, source):
+#         with memoryview(buffer) as view:
+#             for chunk in chunk_huge_buffer(view, max_chunk_size):
+#                 yield chunk
  
 class MyServer(BaseHTTPRequestHandler):
+    def resolve_batches(self):
+        return pa.RecordBatchReader.from_batches(schema, batches)
+
     def do_GET(self):
+        ### given a source of record batches, this function sends them
+        ### to a client using HTTP chunked transfer encoding.
+        source = self.resolve_batches()
 
         if self.request_version == 'HTTP/1.0':
             self.protocol_version = 'HTTP/1.0'
@@ -99,34 +123,19 @@ class MyServer(BaseHTTPRequestHandler):
         
         self.end_headers()
         
-        for buffer in generate_batches(schema, make_reader(schema, batches)):
+        ### if any record batch could be larger than 2 GB, Python's
+        ### http.server will error when calling self.wfile.write(),
+        ### so you will need to split them into smaller chunks by
+        ### using the generate_chunked_buffers() function instead
+        ### if generate_buffers().
+        # for buffer in generate_chunked_buffers(schema, source, int(2e9)):
+        for buffer in generate_buffers(schema, source):
             if chunked:
                 
self.wfile.write('{:X}\r\n'.format(len(buffer)).encode('utf-8'))
             self.wfile.write(buffer)
             if chunked:
                 self.wfile.write('\r\n'.encode('utf-8'))
             self.wfile.flush()
-            
-            ### if any record batch could be larger than 2 GB, Python's
-            ### http.server will error when calling self.wfile.write(),
-            ### so you will need to split them into smaller chunks by 
-            ### replacing the six lines above with this:
-            #chunk_size = int(2e9)
-            #chunk_splits = len(buffer) // chunk_size
-            #for i in range(chunk_splits):
-            #    if chunked:
-            #        
self.wfile.write('{:X}\r\n'.format(chunk_size).encode('utf-8'))
-            #    self.wfile.write(buffer[i * chunk_size:i * chunk_size + 
chunk_size])
-            #    if chunked:
-            #        self.wfile.write('\r\n'.encode('utf-8'))
-            #    self.wfile.flush()
-            #last_chunk_size = len(buffer) - (chunk_splits * chunk_size)
-            #if chunked:
-            #    
self.wfile.write('{:X}\r\n'.format(last_chunk_size).encode('utf-8'))
-            #self.wfile.write(buffer[chunk_splits * chunk_size:])
-            #if chunked:
-            #    self.wfile.write('\r\n'.encode('utf-8'))
-            #self.wfile.flush()
         
         if chunked:
             self.wfile.write('0\r\n\r\n'.encode('utf-8'))

Reply via email to