This is an automated email from the ASF dual-hosted git repository.

ianmcook pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-experiments.git


The following commit(s) were added to refs/heads/main by this push:
     new 8cd6cce  http: Compressed response example in Python (#35)
8cd6cce is described below

commit 8cd6cce55eab7d665124959cc25e7531125869b3
Author: Felipe Oliveira Carvalho <[email protected]>
AuthorDate: Wed Nov 27 12:30:00 2024 -0300

    http: Compressed response example in Python (#35)
    
    * http: Compressed response example in Python
    
    * complete the chunked response loop
    
    * more strict list of available compressors
    
    * simplify config
    
    * better names
    
    * turns out I can use for..in in this loop as well
    
    * fix indent
    
    * don't pick gzip as default when it's not in AVAILABLE_CODINGS
    
    * suggest default filename
    
    * fix brotli file extension
    
    * expand README with note about simpler Accept-Encoding headers
    
    * Add client.py
    
    * reduce buffering and reduce latency
    
    * expedite the yielding of the first buffer
    
    * expand README
    
    * remove test code
    
    * add an option to use dictionary-encoded string column
    
    * readme: add note about IPC compression codec negotiation
    
    * remove BUFFER_ENTIRE_RESPONSE option
    
    * write a parser based on a tokenizer
    
    * make parser generic to Accept and Accept-Encoding
    
    * support IPC buffer compression based on Accept header
    
    * return codec in header
    
    * extend client.py cases
    
    * Update paragraph about double-compression
    
    * Fix typo in README
    
    * Add note about meaning and interpretation of Content-Type
    
    * fix typo
    
    * Apply suggestions from code review
    
    * README.md: Break long lines
    
    * Move make_requests.sh to curl/client.sh
    
    * Add README files to sub directories
    
    * Improve python/server/README.md
    
    * Improve python/client/README.md
    
    * Improve python/client/README.md
---
 http/get_compressed/README.md                      | 166 +++++-
 http/get_compressed/curl/client/README.md          |  80 +++
 http/get_compressed/curl/client/client.sh          |  46 ++
 http/get_compressed/{ => python/client}/README.md  |  14 +-
 http/get_compressed/python/client/client.py        |  96 ++++
 http/get_compressed/{ => python/server}/README.md  |  14 +-
 http/get_compressed/python/server/server.py        | 564 +++++++++++++++++++++
 .../python/client/urllib.request/client.py         |   2 +-
 8 files changed, 976 insertions(+), 6 deletions(-)

diff --git a/http/get_compressed/README.md b/http/get_compressed/README.md
index dde6e20..c8ea47d 100644
--- a/http/get_compressed/README.md
+++ b/http/get_compressed/README.md
@@ -19,4 +19,168 @@
 
 # HTTP GET Arrow Data: Compression Examples
 
-This directory contains examples of HTTP servers/clients that transmit/receive 
data in the Arrow IPC streaming format and use compression (in various ways) to 
reduce the size of the transmitted data.
+This directory contains examples of HTTP servers/clients that transmit/receive
+data in the Arrow IPC streaming format and use compression (in various ways) to
+reduce the size of the transmitted data.
+
+Since we re-use the [Arrow IPC format][ipc] for transferring Arrow data over
+HTTP and both Arrow IPC and HTTP standards support compression on their own,
+there are at least two approaches to this problem:
+
+1. Compressed HTTP responses carrying Arrow IPC streams with uncompressed
+   array buffers.
+2. Uncompressed HTTP responses carrying Arrow IPC streams with compressed
+   array buffers.
+
+Applying both IPC buffer and HTTP compression to the same data is not
+recommended. The extra CPU overhead of decompressing the data twice is
+not worth any possible gains that double compression might bring. If
+compression ratios are unambiguously more important than reducing CPU
+overhead, then a different compression algorithm that optimizes for that can
+be chosen.
+
+This table shows the support for different compression algorithms in HTTP and
+Arrow IPC:
+
+| Codec      | Identifier  | HTTP Support  | IPC Support  |
+|----------- | ----------- | ------------- | ------------ |
+| GZip       | `gzip`      | X             |              |
+| DEFLATE    | `deflate`   | X             |              |
+| Brotli     | `br`        | X[^2]         |              |
+| Zstandard  | `zstd`      | X[^2]         | X[^3]        |
+| LZ4        | `lz4`       |               | X[^3]        |
+
+Since not all Arrow IPC implementations support compression, HTTP compression
+based on accepted formats negotiated with the client is a great way to increase
+the chances of efficient data transfer.
+
+Servers may check the `Accept-Encoding` header of the client and choose the
+compression format in this order of preference: `zstd`, `br`, `gzip`,
+`identity` (no compression). If the client does not specify a preference, the
+only constraint on the server is the availability of the compression algorithm
+in the server environment.
+
+## Arrow IPC Compression
+
+When IPC buffer compression is preferred and servers can't assume all clients
+support it[^4], clients may be asked to explicitly list the supported 
compression
+algorithms in the request headers. The `Accept` header can be used for this
+since `Accept-Encoding` (and `Content-Encoding`) is used to control compression
+of the entire HTTP response stream and instruct HTTP clients (like browsers) to
+decompress the response before giving data to the application or saving the
+data.
+
+    Accept: application/vnd.apache.arrow.stream; codecs="zstd, lz4"
+
+This is similar to clients requesting video streams by specifying the
+container format and the codecs they support
+(e.g. `Accept: video/webm; codecs="vp8, vorbis"`).
+
+The server is allowed to choose any of the listed codecs, or not compress the
+IPC buffers at all. Uncompressed IPC buffers should always be acceptable by
+clients.
+
+If a server adopts this approach and a client does not specify any codecs in
+the `Accept` header, the server can fall back to checking `Accept-Encoding`
+header to pick a compression algorithm for the entire HTTP response stream.
+
+To make debugging easier servers may include the chosen compression codec(s)
+in the `Content-Type` header of the response (quotes are optional):
+
+    Content-Type: application/vnd.apache.arrow.stream; codecs=zstd
+
+This is not necessary for correct decompression because the payload already
+contains information that tells the IPC reader how to decompress the buffers,
+but it can help developers understand what is going on.
+
+When programatically checking if the `Content-Type` header contains a specific
+format, it is important to use a parser that can handle parameters or look
+only at the media type part of the header. This is not an exclusivity of the
+Arrow IPC format, but a general rule for all media types. For example,
+`application/json; charset=utf-8` should match `application/json`.
+
+When considering use of IPC buffer compression, check the [IPC format section 
of
+the Arrow Implementation Status page][^5] to see whether the the Arrow
+implementations you are targeting support it.
+
+## HTTP/1.1 Response Compression
+
+HTTP/1.1 offers an elaborate way for clients to specify their preferred
+content encoding (read compression algorithm) using the `Accept-Encoding`
+header.[^1]
+
+At least the Python server (in [`python/`](./python)) implements a fully
+compliant parser for the `Accept-Encoding` header. Application servers may
+choose to implement a simpler check of the `Accept-Encoding` header or assume
+that the client accepts the chosen compression scheme when talking to that
+server.
+
+Here is an example of a header that a client may send and what it means:
+
+    Accept-Encoding: zstd;q=1.0, gzip;q=0.5, br;q=0.8, identity;q=0
+
+This header says that the client prefers that the server compress the
+response with `zstd`, but if that is not possible, then `brotli` and `gzip`
+are acceptable (in that order because 0.8 is greater than 0.5). The client
+does not want the response to be uncompressed. This is communicated by
+`"identity"` being listed with `q=0`.
+
+To tell the server the client only accepts `zstd` responses and nothing
+else, not even uncompressed responses, the client would send:
+
+    Accept-Encoding: zstd, *;q=0
+
+RFC 2616[^1] specifies the rules for how a server should interpret the
+`Accept-Encoding` header:
+
+    A server tests whether a content-coding is acceptable, according to
+    an Accept-Encoding field, using these rules:
+
+       1. If the content-coding is one of the content-codings listed in
+          the Accept-Encoding field, then it is acceptable, unless it is
+          accompanied by a qvalue of 0. (As defined in section 3.9, a
+          qvalue of 0 means "not acceptable.")
+
+       2. The special "*" symbol in an Accept-Encoding field matches any
+          available content-coding not explicitly listed in the header
+          field.
+
+       3. If multiple content-codings are acceptable, then the acceptable
+          content-coding with the highest non-zero qvalue is preferred.
+
+       4. The "identity" content-coding is always acceptable, unless
+          specifically refused because the Accept-Encoding field includes
+          "identity;q=0", or because the field includes "*;q=0" and does
+          not explicitly include the "identity" content-coding. If the
+          Accept-Encoding field-value is empty, then only the "identity"
+          encoding is acceptable.
+
+If you're targeting web browsers, check the compatibility table of [compression
+algorithms on MDN Web Docs][^2].
+
+Another important rule is that if the server compresses the response, it
+must include a `Content-Encoding` header in the response.
+
+    If the content-coding of an entity is not "identity", then the
+    response MUST include a Content-Encoding entity-header (section
+    14.11) that lists the non-identity content-coding(s) used.
+
+Since not all servers implement the full `Accept-Encoding` header parsing 
logic,
+clients tend to stick to simple header values like `Accept-Encoding: identity`
+when no compression is desired, and `Accept-Encoding: gzip, deflate, zstd, br`
+when the client supports different compression formats and is indifferent to
+which one the server chooses. Clients should expect uncompressed responses as
+well in theses cases. The only way to force a "406 Not Acceptable" response 
when
+no compression is available is to send `identity;q=0` or `*;q=0` somewhere in
+the end of the `Accept-Encoding` header. But that relies on the server
+implementing the full `Accept-Encoding` handling logic.
+
+
+[^1]: [Fielding, R. et al. (1999). HTTP/1.1. RFC 2616, Section 14.3 
Accept-Encoding.](https://www.rfc-editor.org/rfc/rfc2616#section-14.3)
+[^2]: [MDN Web Docs: 
Accept-Encoding](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding#browser_compatibility)
+[^3]: [Arrow Columnar Format: 
Compression](https://arrow.apache.org/docs/format/Columnar.html#compression)
+[^4]: Web applications using the JavaScript Arrow implementation don't have
+    access to the compression APIs to decompress `zstd` and `lz4` IPC buffers.
+[^5]: [Arrow Implementation Status: IPC 
Format](https://arrow.apache.org/docs/status.html#ipc-format)
+
+[ipc]: 
https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc
diff --git a/http/get_compressed/curl/client/README.md 
b/http/get_compressed/curl/client/README.md
new file mode 100644
index 0000000..6694ce0
--- /dev/null
+++ b/http/get_compressed/curl/client/README.md
@@ -0,0 +1,80 @@
+<!---
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+
+# HTTP GET Arrow Data: Compressed Arrow Data Examples
+
+This directory contains a simple `curl` script that issues multiple HTTP GET
+requests to the server implemented in the parent directory, negotiating
+different compression algorithms for the Arrow IPC stream data piping the 
output
+to different files with extensions that indicate the compression algorithm 
used.
+
+To run this example, first start one of the server examples in the parent
+directory, then run the `client.sh` script.
+
+You can check all the sizes with a simple command:
+
+```bash
+$ du -sh out* | sort -gr
+816M    out.arrows
+804M    out_from_chunked.arrows
+418M    out_from_chunked.arrows+lz4
+405M    out.arrows+lz4
+257M    out.arrows.gz
+256M    out_from_chunked.arrows.gz
+229M    out_from_chunked.arrows+zstd
+229M    out.arrows+zstd
+220M    out.arrows.zstd
+219M    out_from_chunked.arrows.zstd
+ 39M    out_from_chunked.arrows.br
+ 38M    out.arrows.br
+```
+
+> [!WARNING]
+> Better compression is not the only relevant metric as it might come with a
+> trade-off in terms of CPU usage. The best compression algorithm for your use
+> case will depend on your specific requirements.
+
+## Meaning of the file extensions
+
+Files produced by HTTP/1.0 requests are not chunked, they get buffered in 
memory
+at the server before being sent to the client. If compressed, they end up
+slightly smaller than the results of chunked responses, but the extra delay for
+first byte is not worth it in most cases.
+
+ - `out.arrows` (Uncompressed)
+ - `out.arrows.gz` (Gzip HTTP compression)
+ - `out.arrows.zstd` (Zstandard HTTP compression)
+ - `out.arrows.br` (Brotli HTTP compression)
+
+ - `out.arrows+zstd` (Zstandard IPC compression)
+ - `out.arrows+lz4` (LZ4 IPC compression)
+
+HTTP/1.1 requests are returned by the server with `Transfer-Encoding: chunked`
+to send the data in smaller chunks that are sent to the socket as soon as they
+are ready. This is useful for large responses that take a long time to generate
+at the cost of a small overhead caused by the independent compression of each
+chunk.
+
+ - `out_from_chunked.arrows` (Uncompressed)
+ - `out_from_chunked.arrows.gz` (Gzip HTTP compression)
+ - `out_from_chunked.arrows.zstd` (Zstandard HTTP compression)
+ - `out_from_chunked.arrows.br` (Brotli HTTP compression)
+
+ - `out_from_chunked.arrows+lz4` (LZ4 IPC compression)
+ - `out_from_chunked.arrows+zstd` (Zstandard IPC compression)
diff --git a/http/get_compressed/curl/client/client.sh 
b/http/get_compressed/curl/client/client.sh
new file mode 100755
index 0000000..1706953
--- /dev/null
+++ b/http/get_compressed/curl/client/client.sh
@@ -0,0 +1,46 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+CURL="curl --verbose"
+URI="http://localhost:8008";
+OUT_HTTP1=out.arrows
+OUT_CHUNKED=out_from_chunked.arrows
+
+# HTTP/1.0 means that response is not chunked and not compressed...
+$CURL --http1.0 -o $OUT_HTTP1 $URI
+# ...but it may be compressed with an explicitly set Accept-Encoding
+# header
+$CURL --http1.0 -H "Accept-Encoding: gzip, *;q=0" -o $OUT_HTTP1.gz $URI
+$CURL --http1.0 -H "Accept-Encoding: zstd, *;q=0" -o $OUT_HTTP1.zstd $URI
+$CURL --http1.0 -H "Accept-Encoding: br, *;q=0" -o $OUT_HTTP1.br $URI
+# ...or with IPC buffer compression if the Accept header specifies codecs.
+$CURL --http1.0 -H "Accept: application/vnd.apache.arrow.stream; 
codecs=\"zstd, lz4\"" -o $OUT_HTTP1+zstd $URI
+$CURL --http1.0 -H "Accept: application/vnd.apache.arrow.stream; codecs=lz4" 
-o $OUT_HTTP1+lz4 $URI
+
+# HTTP/1.1 means compression is on by default...
+# ...but it can be refused with the Accept-Encoding: identity header.
+$CURL -H "Accept-Encoding: identity" -o $OUT_CHUNKED $URI
+# ...with gzip if no Accept-Encoding header is set.
+$CURL -o $OUT_CHUNKED.gz $URI
+# ...or with the compression algorithm specified in the Accept-Encoding.
+$CURL -H "Accept-Encoding: zstd, *;q=0" -o $OUT_CHUNKED.zstd $URI
+$CURL -H "Accept-Encoding: br, *;q=0" -o $OUT_CHUNKED.br $URI
+# ...or with IPC buffer compression if the Accept header specifies codecs.
+$CURL -H "Accept: application/vnd.apache.arrow.stream; codecs=\"zstd, lz4\"" 
-o $OUT_CHUNKED+zstd $URI
+$CURL -H "Accept: application/vnd.apache.arrow.stream; codecs=lz4" -o 
$OUT_CHUNKED+lz4 $URI
diff --git a/http/get_compressed/README.md 
b/http/get_compressed/python/client/README.md
similarity index 64%
copy from http/get_compressed/README.md
copy to http/get_compressed/python/client/README.md
index dde6e20..1285a74 100644
--- a/http/get_compressed/README.md
+++ b/http/get_compressed/python/client/README.md
@@ -17,6 +17,16 @@
   under the License.
 -->
 
-# HTTP GET Arrow Data: Compression Examples
+# HTTP GET Arrow Data: Compressed Arrow Data Examples
 
-This directory contains examples of HTTP servers/clients that transmit/receive 
data in the Arrow IPC streaming format and use compression (in various ways) to 
reduce the size of the transmitted data.
+This directory contains an HTTP client implemented in Python that issues 
multiple 
+requests to one of the server examples implemented in the parent directory,
+negotiating different compression algorithms for the Arrow IPC stream data.
+
+To run this example, first start one of the compressed server examples in the
+parent directory, then:
+
+```sh
+pip install pyarrow
+python client.py
+```
diff --git a/http/get_compressed/python/client/client.py 
b/http/get_compressed/python/client/client.py
new file mode 100644
index 0000000..d09aeb0
--- /dev/null
+++ b/http/get_compressed/python/client/client.py
@@ -0,0 +1,96 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import urllib.request
+import pyarrow as pa
+import time
+
+URI = "http://localhost:8008";
+ARROW_STREAM_FORMAT = "application/vnd.apache.arrow.stream"
+
+
+def make_request(uri, compression):
+    coding = "identity" if compression.startswith("identity") else compression
+    # urllib.request.urlopen() always sends an HTTP/1.1 request
+    # with Accept-Encoding: identity, so we need to setup a request
+    # object with custom headers to request a specific compression
+    headers = {
+        "Accept-Encoding": f"{coding}, *;q=0",
+    }
+    if compression.startswith("identity+"):
+        # request IPC buffer compression instead of HTTP compression
+        ipc_codec = compression.split("+")[1]
+        headers["Accept"] = f'{ARROW_STREAM_FORMAT};codecs="{ipc_codec}"'
+    request = urllib.request.Request(uri, headers=headers)
+
+    response = urllib.request.urlopen(request)
+    content_type = response.headers["Content-Type"]
+    if not content_type.startswith(ARROW_STREAM_FORMAT):
+        raise ValueError(f"Expected {ARROW_STREAM_FORMAT}, got {content_type}")
+    if compression.startswith("identity"):
+        return response
+    # IANA nomenclature for Brotli is "br" and not "brotli"
+    compression = "brotli" if compression == "br" else compression
+    return pa.CompressedInputStream(response, compression)
+
+
+def request_and_process(uri, compression):
+    batches = []
+    log_prefix = f"{'[' + compression + ']':>10}:"
+    print(
+        f"{log_prefix} Requesting data from {uri} with `{compression}` 
compression strategy."
+    )
+    start_time = time.time()
+    response = make_request(uri, compression)
+    with pa.ipc.open_stream(response) as reader:
+        schema = reader.schema
+        time_to_schema = time.time() - start_time
+        try:
+            batch = reader.read_next_batch()
+            time_to_first_batch = time.time() - start_time
+            batches.append(batch)
+            while True:
+                batch = reader.read_next_batch()
+                batches.append(batch)
+        except StopIteration:
+            pass
+        processing_time = time.time() - start_time
+        reader_stats = reader.stats
+    print(
+        f"{log_prefix} Schema received in {time_to_schema:.3f} seconds."
+        f" schema=({', '.join(schema.names)})."
+    )
+    print(
+        f"{log_prefix} First batch received and processed in"
+        f" {time_to_first_batch:.3f} seconds"
+    )
+    print(
+        f"{log_prefix} Processing of all batches completed in"
+        f" {processing_time:.3f} seconds."
+    )
+    print(f"{log_prefix}", reader_stats)
+    return batches
+
+
+# HTTP compression
+request_and_process(URI, "identity")
+request_and_process(URI, "zstd")
+request_and_process(URI, "br")
+request_and_process(URI, "gzip")
+# using IPC buffer compression instead of HTTP compression
+request_and_process(URI, "identity+zstd")
+request_and_process(URI, "identity+lz4")
diff --git a/http/get_compressed/README.md 
b/http/get_compressed/python/server/README.md
similarity index 70%
copy from http/get_compressed/README.md
copy to http/get_compressed/python/server/README.md
index dde6e20..cf4bed7 100644
--- a/http/get_compressed/README.md
+++ b/http/get_compressed/python/server/README.md
@@ -17,6 +17,16 @@
   under the License.
 -->
 
-# HTTP GET Arrow Data: Compression Examples
 
-This directory contains examples of HTTP servers/clients that transmit/receive 
data in the Arrow IPC streaming format and use compression (in various ways) to 
reduce the size of the transmitted data.
+# HTTP GET Arrow Data: Compressed Arrow Data Examples
+
+This directory contains an example of an HTTP server implemented in Python
+able to serve Arrow IPC streams compressed with different algorithms negotiated
+with the client via different standard HTTP headers.
+
+To run this example:
+
+```sh
+pip install pyarrow
+python server.py
+```
diff --git a/http/get_compressed/python/server/server.py 
b/http/get_compressed/python/server/server.py
new file mode 100644
index 0000000..c82c551
--- /dev/null
+++ b/http/get_compressed/python/server/server.py
@@ -0,0 +1,564 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from random import choice, randint
+from http.server import BaseHTTPRequestHandler, HTTPServer
+import io
+import pyarrow as pa
+import pyarrow.compute as pc
+import re
+import socketserver
+import string
+
+# use dictionary encoding for the ticker column
+USE_DICTIONARY_ENCODING = True
+
+
+def random_string(alphabet, length):
+    return "".join(choice(alphabet) for _ in range(length))
+
+
+def random_name(initial):
+    length = randint(3, 7)
+    return initial + random_string(string.ascii_lowercase, length)
+
+
+def example_tickers(num_tickers):
+    tickers = []
+    while len(tickers) < num_tickers:
+        length = randint(3, 4)
+        random_ticker = random_string(string.ascii_uppercase, length)
+        if random_ticker not in tickers:
+            tickers.append(random_ticker)
+    return tickers
+
+
+the_ticker_type = (
+    pa.dictionary(pa.int32(), pa.utf8()) if USE_DICTIONARY_ENCODING else 
pa.utf8()
+)
+the_schema = pa.schema(
+    [
+        ("ticker", the_ticker_type),
+        ("price", pa.int64()),
+        ("volume", pa.int64()),
+    ]
+)
+
+
+def example_batch(tickers, length):
+    ticker_indices = []
+    price = []
+    volume = []
+    for _ in range(length):
+        ticker_indices.append(randint(0, len(tickers) - 1))
+        price.append(randint(1, 1000) * 100)
+        volume.append(randint(1, 10000))
+    ticker = (
+        pa.DictionaryArray.from_arrays(ticker_indices, tickers)
+        if USE_DICTIONARY_ENCODING
+        else pc.take(tickers, ticker_indices, boundscheck=False)
+    )
+    return pa.RecordBatch.from_arrays([ticker, price, volume], 
schema=the_schema)
+
+
+def example_batches(tickers):
+    # these parameters are chosen to generate a response
+    # of ~1 GB and chunks of ~140 KB (uncompressed)
+    total_records = 42_000_000
+    batch_len = 6 * 1024
+    # all the batches sent are random slices of the larger base batch
+    base_batch = example_batch(tickers, length=8 * batch_len)
+    batches = []
+    records = 0
+    while records < total_records:
+        length = min(batch_len, total_records - records)
+        offset = randint(0, base_batch.num_rows - length - 1)
+        batch = base_batch.slice(offset, length)
+        batches.append(batch)
+        records += length
+    return batches
+
+
+# end of example data generation
+
+# what the HTTP spec calls a token (any character except CTLs or separators)
+TOKEN_RE = r"(?:[A-Za-z0-9!#$%&'*+./^_`|~-]+)"
+# [L]inear [W]hite [S]pace pattern (HTTP/1.1 - RFC 2616)
+LWS_RE = r"(?:[ \t]|\r\n[ \t]+)*"
+TOKENIZER_PAT = re.compile(
+    f"(?P<TOK>{TOKEN_RE})"
+    r'|(?P<QUOTED>"([^"\\]|\\.)*")'  # a quoted string (escaped pairs allowed)
+    r"|(?P<COMMA>,)"
+    r"|(?P<SEMI>;)"
+    r"|(?P<EQ>=)"
+    f"|(?P<SKIP>{LWS_RE})"  # LWS is skipped
+    r"|(?P<MISMATCH>.+)",
+    flags=re.ASCII,  # HTTP headers are encoded in ASCII
+)
+
+
+def parse_header_value(header_name, header_value):
+    """
+    Parse the Accept or Accept-Encoding request header values.
+
+    Returns
+    -------
+    list of (str, dict)
+        The list of lowercase tokens and their parameters in the order they
+        appear in the header. The parameters are stored in a dictionary where
+        the keys are the parameter names and the values are the parameter
+        values. If a parameter is not followed by an equal sign and a value,
+        the value is None.
+    """
+
+    def unexpected(label, value):
+        msg = f"Malformed {header_name} header: unexpected {label} at 
{value!r}"
+        return ValueError(msg)
+
+    def tokenize():
+        for mo in re.finditer(TOKENIZER_PAT, header_value):
+            kind = mo.lastgroup
+            if kind == "SKIP":
+                continue
+            elif kind == "MISMATCH":
+                raise unexpected("character", mo.group())
+            yield (kind, mo.group())
+
+    tokens = tokenize()
+
+    def expect(expected_kind):
+        kind, text = next(tokens)
+        if kind != expected_kind:
+            raise unexpected("token", text)
+        return text
+
+    accepted = []
+    while True:
+        try:
+            name, params = None, {}
+            name = expect("TOK").lower()
+            kind, text = next(tokens)
+            while True:
+                if kind == "COMMA":
+                    accepted.append((name, params))
+                    break
+                if kind == "SEMI":
+                    ident = expect("TOK")
+                    params[ident] = None  # init param to None
+                    kind, text = next(tokens)
+                    if kind != "EQ":
+                        continue
+                    kind, text = next(tokens)
+                    if kind in ["TOK", "QUOTED"]:
+                        if kind == "QUOTED":
+                            text = text[1:-1]  # remove the quotes
+                        params[ident] = text  # set param to value
+                        kind, text = next(tokens)
+                        continue
+                raise unexpected("token", text)
+        except StopIteration:
+            break
+    if name is not None:
+        # any unfinished ;param=value sequence or trailing separators are 
ignored
+        accepted.append((name, params))
+    return accepted
+
+
+ARROW_STREAM_FORMAT = "application/vnd.apache.arrow.stream"
+
+
+def pick_ipc_codec(accept_header, available, default):
+    """
+    Pick the IPC stream codec according to the Accept header.
+
+    This is used when deciding which codec to use for compression of IPC buffer
+    streams. This is a feature of the Arrow IPC stream format and is different
+    from the HTTP content-coding used to compress the entire HTTP response.
+
+    This is how a client may specify the IPC buffer compression codecs it
+    accepts:
+
+        Accept: application/vnd.apache.arrow.stream; codecs="zstd, lz4"
+
+    Parameters
+    ----------
+    accept_header : str|None
+        The value of the Accept header from an HTTP request.
+    available : list of str
+        The codecs that the server can provide in the order preferred by the
+        server. Example: ["zstd", "lz4"].
+    default : str|None
+        The codec to use if the client does not specify the ";codecs" parameter
+        in the Accept header.
+
+    Returns
+    -------
+    str|None
+        The codec that the server should use to compress the IPC buffer stream.
+        None if the client does not accept any of the available codecs
+        explicitly listed. ;codecs="" means no codecs are accepted.
+        If the client does not specify the codecs parameter, the default codec
+        is returned.
+    """
+    did_specify_codecs = False
+    accepted_codecs = []
+    if accept_header is not None:
+        accepted = parse_header_value("Accept", accept_header)
+        for media_range, params in accepted:
+            if (
+                media_range == "*/*"
+                or media_range == "application/*"
+                or media_range == ARROW_STREAM_FORMAT
+            ):
+                did_specify_codecs = "codecs" in params
+                codecs_str = params.get("codecs")
+                if codecs_str is None:
+                    continue
+                for codec in codecs_str.split(","):
+                    accepted_codecs.append(codec.strip())
+
+    for codec in available:
+        if codec in accepted_codecs:
+            return codec
+    return None if did_specify_codecs else default
+
+
+def pick_coding(accept_encoding_header, available):
+    """
+    Pick the content-coding according to the Accept-Encoding header.
+
+    This is used when using HTTP response compression instead of IPC buffer
+    compression.
+
+    Parameters
+    ----------
+    accept_encoding_header : str
+        The value of the Accept-Encoding header from an HTTP request.
+    available : list of str
+        The content-codings that the server can provide in the order preferred
+        by the server. Example: ["zstd", "br", "gzip"].
+
+    Returns
+    -------
+    str
+        The content-coding that the server should use to compress the response.
+        "identity" is returned if no acceptable content-coding is found in the
+        list of available codings.
+
+        None if the client does not accept any of the available content-codings
+        and doesn't accept "identity" (uncompressed) either. In this case,
+        a "406 Not Acceptable" response should be sent.
+    """
+    accepted = parse_header_value("Accept-Encoding", accept_encoding_header)
+
+    def qvalue_or(params, default):
+        qvalue = params.get("q")
+        if qvalue is not None:
+            try:
+                return float(qvalue)
+            except ValueError:
+                raise ValueError(f"Invalid qvalue in Accept-Encoding header: 
{qvalue}")
+        return default
+
+    if "identity" not in available:
+        available = available + ["identity"]
+    state = {}
+    for coding, params in accepted:
+        qvalue = qvalue_or(params, 0.0001 if coding == "identity" else 1.0)
+        if coding == "*":
+            for coding in available:
+                if coding not in state:
+                    state[coding] = qvalue
+        elif coding in available:
+            state[coding] = qvalue
+    # "identity" is always acceptable unless explicitly refused (;q=0)
+    if "identity" not in state:
+        state["identity"] = 0.0001
+    # all the candidate codings are now in the state dictionary and we
+    # have to consider only the ones that have the maximum qvalue
+    max_qvalue = max(state.values())
+    if max_qvalue == 0.0:
+        return None
+    for coding in available:
+        if coding in state and state[coding] == max_qvalue:
+            return coding
+    return None
+
+
+def pick_compression(headers, available_ipc_codecs, available_codings, 
default):
+    """
+    Pick the compression strategy based on the Accept and Accept-Encoding 
headers.
+
+    Parameters
+    ----------
+    headers : dict
+        The HTTP request headers.
+    available_ipc_codecs : list of str
+        The codecs that the server can provide for IPC buffer compression.
+    available_codings : list of str
+        The content-codings that the server can provide for HTTP response
+        compression.
+    default : str
+        The default compression strategy to use if the client does explicitly
+        choose.
+
+    Returns
+    -------
+    str|None
+        The compression strategy to use. It can be one of the following:
+        "identity": no compression at all.
+        "identity+zstd": No HTTP compression + IPC buffer compression with 
Zstd.
+        "identity+lz4": No HTTP compression + IPC buffer compression with LZ4.
+        "zstd", "br", "gzip", ...: HTTP compression without IPC buffer 
compression.
+        None means a "406 Not Acceptable" response should be sent.
+    """
+    accept = headers.get("Accept")
+    ipc_codec = pick_ipc_codec(accept, available_ipc_codecs, default=None)
+    if ipc_codec is None:
+        accept_encoding = headers.get("Accept-Encoding")
+        return (
+            default
+            if accept_encoding is None
+            else pick_coding(accept_encoding, available_codings)
+        )
+    return "identity+" + ipc_codec
+
+
+class LateClosingBytesIO(io.BytesIO):
+    """
+    BytesIO that does not close on close().
+
+    When a stream wrapping a a file-like object is closed, the underlying
+    file-like object is also closed. This subclass prevents that from
+    happening by overriding the close method.
+
+    If we close a RecordBatchStreamWriter wrapping a BytesIO object, we want
+    to be able to create a memory view of the buffer. But that is only possible
+    if the BytesIO object is not closed yet.
+    """
+
+    def close(self):
+        pass
+
+    def close_now(self):
+        super().close()
+
+
+class SocketWriterSink(socketserver._SocketWriter):
+    """Wrapper to make wfile usable as a sink for Arrow stream writing."""
+
+    def __init__(self, wfile):
+        self.wfile = wfile
+
+    def writable(self):
+        return True
+
+    def write(self, b):
+        self.wfile.write(b)
+
+    def fileno(self):
+        return self._sock.fileno()
+
+    def close(self):
+        """Do nothing so Arrow stream wrappers don't close the socket."""
+        pass
+
+
+def generate_chunk_buffers(schema, source, compression):
+    # the sink holds the buffer and we give a view of it to the caller
+    with LateClosingBytesIO() as sink:
+        # keep buffering until we have at least MIN_BUFFER_SIZE bytes
+        # in the buffer before yielding it to the caller. Setting it
+        # to 1 means we yield as soon as the compression blocks are
+        # formed and reach the sink buffer.
+        MIN_BUFFER_SIZE = 64 * 1024
+        if compression.startswith("identity"):
+            if compression == "identity+zstd":
+                options = pa.ipc.IpcWriteOptions(compression="zstd")
+            elif compression == "identity+lz4":
+                options = pa.ipc.IpcWriteOptions(compression="lz4")
+            else:
+                options = None
+            # source: RecordBatchReader
+            #   |> writer: RecordBatchStreamWriter
+            #   |> sink: LateClosingBytesIO
+            writer = pa.ipc.new_stream(sink, schema, options=options)
+            for batch in source:
+                writer.write_batch(batch)
+                if sink.tell() >= MIN_BUFFER_SIZE:
+                    sink.truncate()
+                    with sink.getbuffer() as buffer:
+                        yield buffer
+                    sink.seek(0)
+
+            writer.close()  # write EOS marker and flush
+        else:
+            compression = "brotli" if compression == "br" else compression
+            with pa.CompressedOutputStream(sink, compression) as compressor:
+                # has the first buffer been yielded already?
+                sent_first = False
+                # source: RecordBatchReader
+                #   |> writer: RecordBatchStreamWriter
+                #   |> compressor: CompressedOutputStream
+                #   |> sink: LateClosingBytesIO
+                writer = pa.ipc.new_stream(compressor, schema)
+                for batch in source:
+                    writer.write_batch(batch)
+                    # we try to yield a buffer ASAP no matter how small
+                    if not sent_first and sink.tell() == 0:
+                        compressor.flush()
+                    pos = sink.tell()
+                    if pos >= MIN_BUFFER_SIZE or (not sent_first and pos >= 1):
+                        sink.truncate()
+                        with sink.getbuffer() as buffer:
+                            yield buffer
+                        sink.seek(0)
+                        sent_first = True
+
+                writer.close()  # write EOS marker and flush
+                compressor.close()
+
+        sink.truncate()
+        with sink.getbuffer() as buffer:
+            yield buffer
+        sink.close_now()
+
+
+AVAILABLE_IPC_CODECS = ["zstd", "lz4"]
+"""List of available codecs Arrow IPC buffer compression."""
+
+AVAILABLE_CODINGS = ["zstd", "br", "gzip"]
+"""
+List of available content-codings as used in HTTP.
+
+Note that Arrow stream classes refer to Brotli as "brotli" and not "br".
+"""
+
+
+class MyRequestHandler(BaseHTTPRequestHandler):
+    """
+    Response handler for a simple HTTP server.
+
+    This HTTP request handler serves a compressed HTTP response with an Arrow
+    stream in it or a (TODO) compressed Arrow stream in a uncompressed HTTP
+    response.
+
+    The Arrow data is randomly generated "trading data" with a schema 
consisting
+    of a ticker, price (in cents), and volume.
+    """
+
+    def _resolve_batches(self):
+        return pa.RecordBatchReader.from_batches(the_schema, all_batches)
+
+    def _send_not_acceptable(self, parsing_error=None):
+        self.send_response(406, "Not Acceptable")
+        self.send_header("Content-Type", "text/plain")
+        self.end_headers()
+        if parsing_error:
+            message = f"Error parsing header: {parsing_error}\n"
+        else:
+            message = "None of the available codings are accepted by this 
client.\n"
+        accept = self.headers.get("Accept")
+        if accept is not None:
+            message += f"`Accept` header was {accept!r}.\n"
+        accept_encoding = self.headers.get("Accept-Encoding")
+        if accept_encoding is not None:
+            message += f"`Accept-Encoding` header was {accept_encoding!r}.\n"
+        self.wfile.write(bytes(message, "utf-8"))
+
+    def do_GET(self):
+        # HTTP/1.0 requests don't get chunked responses
+        if self.request_version == "HTTP/1.0":
+            self.protocol_version = "HTTP/1.0"
+            chunked = False
+        else:
+            self.protocol_version = "HTTP/1.1"
+            chunked = True
+
+        # if client's intent cannot be derived from the headers, return
+        # uncompressed data for HTTP/1.0 requests and compressed data for
+        # HTTP/1.1 requests with the safest compression format choice: "gzip".
+        default_compression = (
+            "identity"
+            if self.request_version == "HTTP/1.0" or ("gzip" not in 
AVAILABLE_CODINGS)
+            else "gzip"
+        )
+        try:
+            compression = pick_compression(
+                self.headers,
+                AVAILABLE_IPC_CODECS,
+                AVAILABLE_CODINGS,
+                default_compression,
+            )
+            if compression is None:
+                self._send_not_acceptable()
+                return
+        except ValueError as e:
+            self._send_not_acceptable(str(e))
+            return
+
+        ### in a real application the data would be resolved from a database or
+        ### another source like a file and error handling would be done here
+        ### before the 200 OK response starts being sent to the client.
+        source = self._resolve_batches()
+
+        self.send_response(200)
+        ### set these headers if testing with a local browser-based client:
+        # self.send_header('Access-Control-Allow-Origin', 
'http://localhost:8008')
+        # self.send_header('Access-Control-Allow-Methods', 'GET')
+        # self.send_header('Access-Control-Allow-Headers', 'Content-Type')
+        self.send_header(
+            "Content-Type",
+            (
+                f"{ARROW_STREAM_FORMAT}; codecs={compression[9:]}"
+                if compression.startswith("identity+")
+                else ARROW_STREAM_FORMAT
+            ),
+        )
+        # suggest a default filename in case this response is saved by the user
+        self.send_header("Content-Disposition", r'attachment; 
filename="output.arrows"')
+
+        if not compression.startswith("identity"):
+            self.send_header("Content-Encoding", compression)
+        if chunked:
+            self.send_header("Transfer-Encoding", "chunked")
+            self.end_headers()
+            for buffer in generate_chunk_buffers(the_schema, source, 
compression):
+                self.wfile.write(f"{len(buffer):X}\r\n".encode("utf-8"))
+                self.wfile.write(buffer)
+                self.wfile.write("\r\n".encode("utf-8"))
+            self.wfile.write("0\r\n\r\n".encode("utf-8"))
+        else:
+            self.end_headers()
+            sink = SocketWriterSink(self.wfile)
+            for buffer in generate_chunk_buffers(the_schema, source, 
compression):
+                sink.write(buffer)
+
+
+print("Generating example data...")
+
+all_tickers = example_tickers(60)
+all_batches = example_batches(all_tickers)
+
+server_address = ("localhost", 8008)
+try:
+    httpd = HTTPServer(server_address, MyRequestHandler)
+    print(f"Serving on {server_address[0]}:{server_address[1]}...")
+    httpd.serve_forever()
+except KeyboardInterrupt:
+    print("Shutting down server")
+    httpd.socket.close()
diff --git a/http/get_simple/python/client/urllib.request/client.py 
b/http/get_simple/python/client/urllib.request/client.py
index 1d5d198..a2f24de 100644
--- a/http/get_simple/python/client/urllib.request/client.py
+++ b/http/get_simple/python/client/urllib.request/client.py
@@ -25,7 +25,7 @@ start_time = time.time()
 
 response = urllib.request.urlopen('http://localhost:8008')
 content_type = response.headers['Content-Type']
-if content_type != ARROW_STREAM_FORMAT:
+if not content_type.startswith(ARROW_STREAM_FORMAT):
     raise ValueError(f"Expected {ARROW_STREAM_FORMAT}, got {content_type}")
 
 batches = []


Reply via email to