This is an automated email from the ASF dual-hosted git repository.
ianmcook pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-experiments.git
The following commit(s) were added to refs/heads/main by this push:
new 8cd6cce http: Compressed response example in Python (#35)
8cd6cce is described below
commit 8cd6cce55eab7d665124959cc25e7531125869b3
Author: Felipe Oliveira Carvalho <[email protected]>
AuthorDate: Wed Nov 27 12:30:00 2024 -0300
http: Compressed response example in Python (#35)
* http: Compressed response example in Python
* complete the chunked response loop
* more strict list of available compressors
* simplify config
* better names
* turns out I can use for..in in this loop as well
* fix indent
* don't pick gzip as default when it's not in AVAILABLE_CODINGS
* suggest default filename
* fix brotli file extension
* expand README with note about simpler Accept-Encoding headers
* Add client.py
* reduce buffering and reduce latency
* expedite the yielding of the first buffer
* expand README
* remove test code
* add an option to use dictionary-encoded string column
* readme: add note about IPC compression codec negotiation
* remove BUFFER_ENTIRE_RESPONSE option
* write a parser based on a tokenizer
* make parser generic to Accept and Accept-Encoding
* support IPC buffer compression based on Accept header
* return codec in header
* extend client.py cases
* Update paragraph about double-compression
* Fix typo in README
* Add note about meaning and interpretation of Content-Type
* fix typo
* Apply suggestions from code review
* README.md: Break long lines
* Move make_requests.sh to curl/client.sh
* Add README files to sub directories
* Improve python/server/README.md
* Improve python/client/README.md
* Improve python/client/README.md
---
http/get_compressed/README.md | 166 +++++-
http/get_compressed/curl/client/README.md | 80 +++
http/get_compressed/curl/client/client.sh | 46 ++
http/get_compressed/{ => python/client}/README.md | 14 +-
http/get_compressed/python/client/client.py | 96 ++++
http/get_compressed/{ => python/server}/README.md | 14 +-
http/get_compressed/python/server/server.py | 564 +++++++++++++++++++++
.../python/client/urllib.request/client.py | 2 +-
8 files changed, 976 insertions(+), 6 deletions(-)
diff --git a/http/get_compressed/README.md b/http/get_compressed/README.md
index dde6e20..c8ea47d 100644
--- a/http/get_compressed/README.md
+++ b/http/get_compressed/README.md
@@ -19,4 +19,168 @@
# HTTP GET Arrow Data: Compression Examples
-This directory contains examples of HTTP servers/clients that transmit/receive
data in the Arrow IPC streaming format and use compression (in various ways) to
reduce the size of the transmitted data.
+This directory contains examples of HTTP servers/clients that transmit/receive
+data in the Arrow IPC streaming format and use compression (in various ways) to
+reduce the size of the transmitted data.
+
+Since we re-use the [Arrow IPC format][ipc] for transferring Arrow data over
+HTTP and both Arrow IPC and HTTP standards support compression on their own,
+there are at least two approaches to this problem:
+
+1. Compressed HTTP responses carrying Arrow IPC streams with uncompressed
+ array buffers.
+2. Uncompressed HTTP responses carrying Arrow IPC streams with compressed
+ array buffers.
+
+Applying both IPC buffer and HTTP compression to the same data is not
+recommended. The extra CPU overhead of decompressing the data twice is
+not worth any possible gains that double compression might bring. If
+compression ratios are unambiguously more important than reducing CPU
+overhead, then a different compression algorithm that optimizes for that can
+be chosen.
+
+This table shows the support for different compression algorithms in HTTP and
+Arrow IPC:
+
+| Codec | Identifier | HTTP Support | IPC Support |
+|----------- | ----------- | ------------- | ------------ |
+| GZip | `gzip` | X | |
+| DEFLATE | `deflate` | X | |
+| Brotli | `br` | X[^2] | |
+| Zstandard | `zstd` | X[^2] | X[^3] |
+| LZ4 | `lz4` | | X[^3] |
+
+Since not all Arrow IPC implementations support compression, HTTP compression
+based on accepted formats negotiated with the client is a great way to increase
+the chances of efficient data transfer.
+
+Servers may check the `Accept-Encoding` header of the client and choose the
+compression format in this order of preference: `zstd`, `br`, `gzip`,
+`identity` (no compression). If the client does not specify a preference, the
+only constraint on the server is the availability of the compression algorithm
+in the server environment.
+
+## Arrow IPC Compression
+
+When IPC buffer compression is preferred and servers can't assume all clients
+support it[^4], clients may be asked to explicitly list the supported
compression
+algorithms in the request headers. The `Accept` header can be used for this
+since `Accept-Encoding` (and `Content-Encoding`) is used to control compression
+of the entire HTTP response stream and instruct HTTP clients (like browsers) to
+decompress the response before giving data to the application or saving the
+data.
+
+ Accept: application/vnd.apache.arrow.stream; codecs="zstd, lz4"
+
+This is similar to clients requesting video streams by specifying the
+container format and the codecs they support
+(e.g. `Accept: video/webm; codecs="vp8, vorbis"`).
+
+The server is allowed to choose any of the listed codecs, or not compress the
+IPC buffers at all. Uncompressed IPC buffers should always be acceptable by
+clients.
+
+If a server adopts this approach and a client does not specify any codecs in
+the `Accept` header, the server can fall back to checking `Accept-Encoding`
+header to pick a compression algorithm for the entire HTTP response stream.
+
+To make debugging easier servers may include the chosen compression codec(s)
+in the `Content-Type` header of the response (quotes are optional):
+
+ Content-Type: application/vnd.apache.arrow.stream; codecs=zstd
+
+This is not necessary for correct decompression because the payload already
+contains information that tells the IPC reader how to decompress the buffers,
+but it can help developers understand what is going on.
+
+When programatically checking if the `Content-Type` header contains a specific
+format, it is important to use a parser that can handle parameters or look
+only at the media type part of the header. This is not an exclusivity of the
+Arrow IPC format, but a general rule for all media types. For example,
+`application/json; charset=utf-8` should match `application/json`.
+
+When considering use of IPC buffer compression, check the [IPC format section
of
+the Arrow Implementation Status page][^5] to see whether the the Arrow
+implementations you are targeting support it.
+
+## HTTP/1.1 Response Compression
+
+HTTP/1.1 offers an elaborate way for clients to specify their preferred
+content encoding (read compression algorithm) using the `Accept-Encoding`
+header.[^1]
+
+At least the Python server (in [`python/`](./python)) implements a fully
+compliant parser for the `Accept-Encoding` header. Application servers may
+choose to implement a simpler check of the `Accept-Encoding` header or assume
+that the client accepts the chosen compression scheme when talking to that
+server.
+
+Here is an example of a header that a client may send and what it means:
+
+ Accept-Encoding: zstd;q=1.0, gzip;q=0.5, br;q=0.8, identity;q=0
+
+This header says that the client prefers that the server compress the
+response with `zstd`, but if that is not possible, then `brotli` and `gzip`
+are acceptable (in that order because 0.8 is greater than 0.5). The client
+does not want the response to be uncompressed. This is communicated by
+`"identity"` being listed with `q=0`.
+
+To tell the server the client only accepts `zstd` responses and nothing
+else, not even uncompressed responses, the client would send:
+
+ Accept-Encoding: zstd, *;q=0
+
+RFC 2616[^1] specifies the rules for how a server should interpret the
+`Accept-Encoding` header:
+
+ A server tests whether a content-coding is acceptable, according to
+ an Accept-Encoding field, using these rules:
+
+ 1. If the content-coding is one of the content-codings listed in
+ the Accept-Encoding field, then it is acceptable, unless it is
+ accompanied by a qvalue of 0. (As defined in section 3.9, a
+ qvalue of 0 means "not acceptable.")
+
+ 2. The special "*" symbol in an Accept-Encoding field matches any
+ available content-coding not explicitly listed in the header
+ field.
+
+ 3. If multiple content-codings are acceptable, then the acceptable
+ content-coding with the highest non-zero qvalue is preferred.
+
+ 4. The "identity" content-coding is always acceptable, unless
+ specifically refused because the Accept-Encoding field includes
+ "identity;q=0", or because the field includes "*;q=0" and does
+ not explicitly include the "identity" content-coding. If the
+ Accept-Encoding field-value is empty, then only the "identity"
+ encoding is acceptable.
+
+If you're targeting web browsers, check the compatibility table of [compression
+algorithms on MDN Web Docs][^2].
+
+Another important rule is that if the server compresses the response, it
+must include a `Content-Encoding` header in the response.
+
+ If the content-coding of an entity is not "identity", then the
+ response MUST include a Content-Encoding entity-header (section
+ 14.11) that lists the non-identity content-coding(s) used.
+
+Since not all servers implement the full `Accept-Encoding` header parsing
logic,
+clients tend to stick to simple header values like `Accept-Encoding: identity`
+when no compression is desired, and `Accept-Encoding: gzip, deflate, zstd, br`
+when the client supports different compression formats and is indifferent to
+which one the server chooses. Clients should expect uncompressed responses as
+well in theses cases. The only way to force a "406 Not Acceptable" response
when
+no compression is available is to send `identity;q=0` or `*;q=0` somewhere in
+the end of the `Accept-Encoding` header. But that relies on the server
+implementing the full `Accept-Encoding` handling logic.
+
+
+[^1]: [Fielding, R. et al. (1999). HTTP/1.1. RFC 2616, Section 14.3
Accept-Encoding.](https://www.rfc-editor.org/rfc/rfc2616#section-14.3)
+[^2]: [MDN Web Docs:
Accept-Encoding](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding#browser_compatibility)
+[^3]: [Arrow Columnar Format:
Compression](https://arrow.apache.org/docs/format/Columnar.html#compression)
+[^4]: Web applications using the JavaScript Arrow implementation don't have
+ access to the compression APIs to decompress `zstd` and `lz4` IPC buffers.
+[^5]: [Arrow Implementation Status: IPC
Format](https://arrow.apache.org/docs/status.html#ipc-format)
+
+[ipc]:
https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc
diff --git a/http/get_compressed/curl/client/README.md
b/http/get_compressed/curl/client/README.md
new file mode 100644
index 0000000..6694ce0
--- /dev/null
+++ b/http/get_compressed/curl/client/README.md
@@ -0,0 +1,80 @@
+<!---
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# HTTP GET Arrow Data: Compressed Arrow Data Examples
+
+This directory contains a simple `curl` script that issues multiple HTTP GET
+requests to the server implemented in the parent directory, negotiating
+different compression algorithms for the Arrow IPC stream data piping the
output
+to different files with extensions that indicate the compression algorithm
used.
+
+To run this example, first start one of the server examples in the parent
+directory, then run the `client.sh` script.
+
+You can check all the sizes with a simple command:
+
+```bash
+$ du -sh out* | sort -gr
+816M out.arrows
+804M out_from_chunked.arrows
+418M out_from_chunked.arrows+lz4
+405M out.arrows+lz4
+257M out.arrows.gz
+256M out_from_chunked.arrows.gz
+229M out_from_chunked.arrows+zstd
+229M out.arrows+zstd
+220M out.arrows.zstd
+219M out_from_chunked.arrows.zstd
+ 39M out_from_chunked.arrows.br
+ 38M out.arrows.br
+```
+
+> [!WARNING]
+> Better compression is not the only relevant metric as it might come with a
+> trade-off in terms of CPU usage. The best compression algorithm for your use
+> case will depend on your specific requirements.
+
+## Meaning of the file extensions
+
+Files produced by HTTP/1.0 requests are not chunked, they get buffered in
memory
+at the server before being sent to the client. If compressed, they end up
+slightly smaller than the results of chunked responses, but the extra delay for
+first byte is not worth it in most cases.
+
+ - `out.arrows` (Uncompressed)
+ - `out.arrows.gz` (Gzip HTTP compression)
+ - `out.arrows.zstd` (Zstandard HTTP compression)
+ - `out.arrows.br` (Brotli HTTP compression)
+
+ - `out.arrows+zstd` (Zstandard IPC compression)
+ - `out.arrows+lz4` (LZ4 IPC compression)
+
+HTTP/1.1 requests are returned by the server with `Transfer-Encoding: chunked`
+to send the data in smaller chunks that are sent to the socket as soon as they
+are ready. This is useful for large responses that take a long time to generate
+at the cost of a small overhead caused by the independent compression of each
+chunk.
+
+ - `out_from_chunked.arrows` (Uncompressed)
+ - `out_from_chunked.arrows.gz` (Gzip HTTP compression)
+ - `out_from_chunked.arrows.zstd` (Zstandard HTTP compression)
+ - `out_from_chunked.arrows.br` (Brotli HTTP compression)
+
+ - `out_from_chunked.arrows+lz4` (LZ4 IPC compression)
+ - `out_from_chunked.arrows+zstd` (Zstandard IPC compression)
diff --git a/http/get_compressed/curl/client/client.sh
b/http/get_compressed/curl/client/client.sh
new file mode 100755
index 0000000..1706953
--- /dev/null
+++ b/http/get_compressed/curl/client/client.sh
@@ -0,0 +1,46 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+CURL="curl --verbose"
+URI="http://localhost:8008"
+OUT_HTTP1=out.arrows
+OUT_CHUNKED=out_from_chunked.arrows
+
+# HTTP/1.0 means that response is not chunked and not compressed...
+$CURL --http1.0 -o $OUT_HTTP1 $URI
+# ...but it may be compressed with an explicitly set Accept-Encoding
+# header
+$CURL --http1.0 -H "Accept-Encoding: gzip, *;q=0" -o $OUT_HTTP1.gz $URI
+$CURL --http1.0 -H "Accept-Encoding: zstd, *;q=0" -o $OUT_HTTP1.zstd $URI
+$CURL --http1.0 -H "Accept-Encoding: br, *;q=0" -o $OUT_HTTP1.br $URI
+# ...or with IPC buffer compression if the Accept header specifies codecs.
+$CURL --http1.0 -H "Accept: application/vnd.apache.arrow.stream;
codecs=\"zstd, lz4\"" -o $OUT_HTTP1+zstd $URI
+$CURL --http1.0 -H "Accept: application/vnd.apache.arrow.stream; codecs=lz4"
-o $OUT_HTTP1+lz4 $URI
+
+# HTTP/1.1 means compression is on by default...
+# ...but it can be refused with the Accept-Encoding: identity header.
+$CURL -H "Accept-Encoding: identity" -o $OUT_CHUNKED $URI
+# ...with gzip if no Accept-Encoding header is set.
+$CURL -o $OUT_CHUNKED.gz $URI
+# ...or with the compression algorithm specified in the Accept-Encoding.
+$CURL -H "Accept-Encoding: zstd, *;q=0" -o $OUT_CHUNKED.zstd $URI
+$CURL -H "Accept-Encoding: br, *;q=0" -o $OUT_CHUNKED.br $URI
+# ...or with IPC buffer compression if the Accept header specifies codecs.
+$CURL -H "Accept: application/vnd.apache.arrow.stream; codecs=\"zstd, lz4\""
-o $OUT_CHUNKED+zstd $URI
+$CURL -H "Accept: application/vnd.apache.arrow.stream; codecs=lz4" -o
$OUT_CHUNKED+lz4 $URI
diff --git a/http/get_compressed/README.md
b/http/get_compressed/python/client/README.md
similarity index 64%
copy from http/get_compressed/README.md
copy to http/get_compressed/python/client/README.md
index dde6e20..1285a74 100644
--- a/http/get_compressed/README.md
+++ b/http/get_compressed/python/client/README.md
@@ -17,6 +17,16 @@
under the License.
-->
-# HTTP GET Arrow Data: Compression Examples
+# HTTP GET Arrow Data: Compressed Arrow Data Examples
-This directory contains examples of HTTP servers/clients that transmit/receive
data in the Arrow IPC streaming format and use compression (in various ways) to
reduce the size of the transmitted data.
+This directory contains an HTTP client implemented in Python that issues
multiple
+requests to one of the server examples implemented in the parent directory,
+negotiating different compression algorithms for the Arrow IPC stream data.
+
+To run this example, first start one of the compressed server examples in the
+parent directory, then:
+
+```sh
+pip install pyarrow
+python client.py
+```
diff --git a/http/get_compressed/python/client/client.py
b/http/get_compressed/python/client/client.py
new file mode 100644
index 0000000..d09aeb0
--- /dev/null
+++ b/http/get_compressed/python/client/client.py
@@ -0,0 +1,96 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import urllib.request
+import pyarrow as pa
+import time
+
+URI = "http://localhost:8008"
+ARROW_STREAM_FORMAT = "application/vnd.apache.arrow.stream"
+
+
+def make_request(uri, compression):
+ coding = "identity" if compression.startswith("identity") else compression
+ # urllib.request.urlopen() always sends an HTTP/1.1 request
+ # with Accept-Encoding: identity, so we need to setup a request
+ # object with custom headers to request a specific compression
+ headers = {
+ "Accept-Encoding": f"{coding}, *;q=0",
+ }
+ if compression.startswith("identity+"):
+ # request IPC buffer compression instead of HTTP compression
+ ipc_codec = compression.split("+")[1]
+ headers["Accept"] = f'{ARROW_STREAM_FORMAT};codecs="{ipc_codec}"'
+ request = urllib.request.Request(uri, headers=headers)
+
+ response = urllib.request.urlopen(request)
+ content_type = response.headers["Content-Type"]
+ if not content_type.startswith(ARROW_STREAM_FORMAT):
+ raise ValueError(f"Expected {ARROW_STREAM_FORMAT}, got {content_type}")
+ if compression.startswith("identity"):
+ return response
+ # IANA nomenclature for Brotli is "br" and not "brotli"
+ compression = "brotli" if compression == "br" else compression
+ return pa.CompressedInputStream(response, compression)
+
+
+def request_and_process(uri, compression):
+ batches = []
+ log_prefix = f"{'[' + compression + ']':>10}:"
+ print(
+ f"{log_prefix} Requesting data from {uri} with `{compression}`
compression strategy."
+ )
+ start_time = time.time()
+ response = make_request(uri, compression)
+ with pa.ipc.open_stream(response) as reader:
+ schema = reader.schema
+ time_to_schema = time.time() - start_time
+ try:
+ batch = reader.read_next_batch()
+ time_to_first_batch = time.time() - start_time
+ batches.append(batch)
+ while True:
+ batch = reader.read_next_batch()
+ batches.append(batch)
+ except StopIteration:
+ pass
+ processing_time = time.time() - start_time
+ reader_stats = reader.stats
+ print(
+ f"{log_prefix} Schema received in {time_to_schema:.3f} seconds."
+ f" schema=({', '.join(schema.names)})."
+ )
+ print(
+ f"{log_prefix} First batch received and processed in"
+ f" {time_to_first_batch:.3f} seconds"
+ )
+ print(
+ f"{log_prefix} Processing of all batches completed in"
+ f" {processing_time:.3f} seconds."
+ )
+ print(f"{log_prefix}", reader_stats)
+ return batches
+
+
+# HTTP compression
+request_and_process(URI, "identity")
+request_and_process(URI, "zstd")
+request_and_process(URI, "br")
+request_and_process(URI, "gzip")
+# using IPC buffer compression instead of HTTP compression
+request_and_process(URI, "identity+zstd")
+request_and_process(URI, "identity+lz4")
diff --git a/http/get_compressed/README.md
b/http/get_compressed/python/server/README.md
similarity index 70%
copy from http/get_compressed/README.md
copy to http/get_compressed/python/server/README.md
index dde6e20..cf4bed7 100644
--- a/http/get_compressed/README.md
+++ b/http/get_compressed/python/server/README.md
@@ -17,6 +17,16 @@
under the License.
-->
-# HTTP GET Arrow Data: Compression Examples
-This directory contains examples of HTTP servers/clients that transmit/receive
data in the Arrow IPC streaming format and use compression (in various ways) to
reduce the size of the transmitted data.
+# HTTP GET Arrow Data: Compressed Arrow Data Examples
+
+This directory contains an example of an HTTP server implemented in Python
+able to serve Arrow IPC streams compressed with different algorithms negotiated
+with the client via different standard HTTP headers.
+
+To run this example:
+
+```sh
+pip install pyarrow
+python server.py
+```
diff --git a/http/get_compressed/python/server/server.py
b/http/get_compressed/python/server/server.py
new file mode 100644
index 0000000..c82c551
--- /dev/null
+++ b/http/get_compressed/python/server/server.py
@@ -0,0 +1,564 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from random import choice, randint
+from http.server import BaseHTTPRequestHandler, HTTPServer
+import io
+import pyarrow as pa
+import pyarrow.compute as pc
+import re
+import socketserver
+import string
+
+# use dictionary encoding for the ticker column
+USE_DICTIONARY_ENCODING = True
+
+
+def random_string(alphabet, length):
+ return "".join(choice(alphabet) for _ in range(length))
+
+
+def random_name(initial):
+ length = randint(3, 7)
+ return initial + random_string(string.ascii_lowercase, length)
+
+
+def example_tickers(num_tickers):
+ tickers = []
+ while len(tickers) < num_tickers:
+ length = randint(3, 4)
+ random_ticker = random_string(string.ascii_uppercase, length)
+ if random_ticker not in tickers:
+ tickers.append(random_ticker)
+ return tickers
+
+
+the_ticker_type = (
+ pa.dictionary(pa.int32(), pa.utf8()) if USE_DICTIONARY_ENCODING else
pa.utf8()
+)
+the_schema = pa.schema(
+ [
+ ("ticker", the_ticker_type),
+ ("price", pa.int64()),
+ ("volume", pa.int64()),
+ ]
+)
+
+
+def example_batch(tickers, length):
+ ticker_indices = []
+ price = []
+ volume = []
+ for _ in range(length):
+ ticker_indices.append(randint(0, len(tickers) - 1))
+ price.append(randint(1, 1000) * 100)
+ volume.append(randint(1, 10000))
+ ticker = (
+ pa.DictionaryArray.from_arrays(ticker_indices, tickers)
+ if USE_DICTIONARY_ENCODING
+ else pc.take(tickers, ticker_indices, boundscheck=False)
+ )
+ return pa.RecordBatch.from_arrays([ticker, price, volume],
schema=the_schema)
+
+
+def example_batches(tickers):
+ # these parameters are chosen to generate a response
+ # of ~1 GB and chunks of ~140 KB (uncompressed)
+ total_records = 42_000_000
+ batch_len = 6 * 1024
+ # all the batches sent are random slices of the larger base batch
+ base_batch = example_batch(tickers, length=8 * batch_len)
+ batches = []
+ records = 0
+ while records < total_records:
+ length = min(batch_len, total_records - records)
+ offset = randint(0, base_batch.num_rows - length - 1)
+ batch = base_batch.slice(offset, length)
+ batches.append(batch)
+ records += length
+ return batches
+
+
+# end of example data generation
+
+# what the HTTP spec calls a token (any character except CTLs or separators)
+TOKEN_RE = r"(?:[A-Za-z0-9!#$%&'*+./^_`|~-]+)"
+# [L]inear [W]hite [S]pace pattern (HTTP/1.1 - RFC 2616)
+LWS_RE = r"(?:[ \t]|\r\n[ \t]+)*"
+TOKENIZER_PAT = re.compile(
+ f"(?P<TOK>{TOKEN_RE})"
+ r'|(?P<QUOTED>"([^"\\]|\\.)*")' # a quoted string (escaped pairs allowed)
+ r"|(?P<COMMA>,)"
+ r"|(?P<SEMI>;)"
+ r"|(?P<EQ>=)"
+ f"|(?P<SKIP>{LWS_RE})" # LWS is skipped
+ r"|(?P<MISMATCH>.+)",
+ flags=re.ASCII, # HTTP headers are encoded in ASCII
+)
+
+
+def parse_header_value(header_name, header_value):
+ """
+ Parse the Accept or Accept-Encoding request header values.
+
+ Returns
+ -------
+ list of (str, dict)
+ The list of lowercase tokens and their parameters in the order they
+ appear in the header. The parameters are stored in a dictionary where
+ the keys are the parameter names and the values are the parameter
+ values. If a parameter is not followed by an equal sign and a value,
+ the value is None.
+ """
+
+ def unexpected(label, value):
+ msg = f"Malformed {header_name} header: unexpected {label} at
{value!r}"
+ return ValueError(msg)
+
+ def tokenize():
+ for mo in re.finditer(TOKENIZER_PAT, header_value):
+ kind = mo.lastgroup
+ if kind == "SKIP":
+ continue
+ elif kind == "MISMATCH":
+ raise unexpected("character", mo.group())
+ yield (kind, mo.group())
+
+ tokens = tokenize()
+
+ def expect(expected_kind):
+ kind, text = next(tokens)
+ if kind != expected_kind:
+ raise unexpected("token", text)
+ return text
+
+ accepted = []
+ while True:
+ try:
+ name, params = None, {}
+ name = expect("TOK").lower()
+ kind, text = next(tokens)
+ while True:
+ if kind == "COMMA":
+ accepted.append((name, params))
+ break
+ if kind == "SEMI":
+ ident = expect("TOK")
+ params[ident] = None # init param to None
+ kind, text = next(tokens)
+ if kind != "EQ":
+ continue
+ kind, text = next(tokens)
+ if kind in ["TOK", "QUOTED"]:
+ if kind == "QUOTED":
+ text = text[1:-1] # remove the quotes
+ params[ident] = text # set param to value
+ kind, text = next(tokens)
+ continue
+ raise unexpected("token", text)
+ except StopIteration:
+ break
+ if name is not None:
+ # any unfinished ;param=value sequence or trailing separators are
ignored
+ accepted.append((name, params))
+ return accepted
+
+
+ARROW_STREAM_FORMAT = "application/vnd.apache.arrow.stream"
+
+
+def pick_ipc_codec(accept_header, available, default):
+ """
+ Pick the IPC stream codec according to the Accept header.
+
+ This is used when deciding which codec to use for compression of IPC buffer
+ streams. This is a feature of the Arrow IPC stream format and is different
+ from the HTTP content-coding used to compress the entire HTTP response.
+
+ This is how a client may specify the IPC buffer compression codecs it
+ accepts:
+
+ Accept: application/vnd.apache.arrow.stream; codecs="zstd, lz4"
+
+ Parameters
+ ----------
+ accept_header : str|None
+ The value of the Accept header from an HTTP request.
+ available : list of str
+ The codecs that the server can provide in the order preferred by the
+ server. Example: ["zstd", "lz4"].
+ default : str|None
+ The codec to use if the client does not specify the ";codecs" parameter
+ in the Accept header.
+
+ Returns
+ -------
+ str|None
+ The codec that the server should use to compress the IPC buffer stream.
+ None if the client does not accept any of the available codecs
+ explicitly listed. ;codecs="" means no codecs are accepted.
+ If the client does not specify the codecs parameter, the default codec
+ is returned.
+ """
+ did_specify_codecs = False
+ accepted_codecs = []
+ if accept_header is not None:
+ accepted = parse_header_value("Accept", accept_header)
+ for media_range, params in accepted:
+ if (
+ media_range == "*/*"
+ or media_range == "application/*"
+ or media_range == ARROW_STREAM_FORMAT
+ ):
+ did_specify_codecs = "codecs" in params
+ codecs_str = params.get("codecs")
+ if codecs_str is None:
+ continue
+ for codec in codecs_str.split(","):
+ accepted_codecs.append(codec.strip())
+
+ for codec in available:
+ if codec in accepted_codecs:
+ return codec
+ return None if did_specify_codecs else default
+
+
+def pick_coding(accept_encoding_header, available):
+ """
+ Pick the content-coding according to the Accept-Encoding header.
+
+ This is used when using HTTP response compression instead of IPC buffer
+ compression.
+
+ Parameters
+ ----------
+ accept_encoding_header : str
+ The value of the Accept-Encoding header from an HTTP request.
+ available : list of str
+ The content-codings that the server can provide in the order preferred
+ by the server. Example: ["zstd", "br", "gzip"].
+
+ Returns
+ -------
+ str
+ The content-coding that the server should use to compress the response.
+ "identity" is returned if no acceptable content-coding is found in the
+ list of available codings.
+
+ None if the client does not accept any of the available content-codings
+ and doesn't accept "identity" (uncompressed) either. In this case,
+ a "406 Not Acceptable" response should be sent.
+ """
+ accepted = parse_header_value("Accept-Encoding", accept_encoding_header)
+
+ def qvalue_or(params, default):
+ qvalue = params.get("q")
+ if qvalue is not None:
+ try:
+ return float(qvalue)
+ except ValueError:
+ raise ValueError(f"Invalid qvalue in Accept-Encoding header:
{qvalue}")
+ return default
+
+ if "identity" not in available:
+ available = available + ["identity"]
+ state = {}
+ for coding, params in accepted:
+ qvalue = qvalue_or(params, 0.0001 if coding == "identity" else 1.0)
+ if coding == "*":
+ for coding in available:
+ if coding not in state:
+ state[coding] = qvalue
+ elif coding in available:
+ state[coding] = qvalue
+ # "identity" is always acceptable unless explicitly refused (;q=0)
+ if "identity" not in state:
+ state["identity"] = 0.0001
+ # all the candidate codings are now in the state dictionary and we
+ # have to consider only the ones that have the maximum qvalue
+ max_qvalue = max(state.values())
+ if max_qvalue == 0.0:
+ return None
+ for coding in available:
+ if coding in state and state[coding] == max_qvalue:
+ return coding
+ return None
+
+
+def pick_compression(headers, available_ipc_codecs, available_codings,
default):
+ """
+ Pick the compression strategy based on the Accept and Accept-Encoding
headers.
+
+ Parameters
+ ----------
+ headers : dict
+ The HTTP request headers.
+ available_ipc_codecs : list of str
+ The codecs that the server can provide for IPC buffer compression.
+ available_codings : list of str
+ The content-codings that the server can provide for HTTP response
+ compression.
+ default : str
+ The default compression strategy to use if the client does explicitly
+ choose.
+
+ Returns
+ -------
+ str|None
+ The compression strategy to use. It can be one of the following:
+ "identity": no compression at all.
+ "identity+zstd": No HTTP compression + IPC buffer compression with
Zstd.
+ "identity+lz4": No HTTP compression + IPC buffer compression with LZ4.
+ "zstd", "br", "gzip", ...: HTTP compression without IPC buffer
compression.
+ None means a "406 Not Acceptable" response should be sent.
+ """
+ accept = headers.get("Accept")
+ ipc_codec = pick_ipc_codec(accept, available_ipc_codecs, default=None)
+ if ipc_codec is None:
+ accept_encoding = headers.get("Accept-Encoding")
+ return (
+ default
+ if accept_encoding is None
+ else pick_coding(accept_encoding, available_codings)
+ )
+ return "identity+" + ipc_codec
+
+
+class LateClosingBytesIO(io.BytesIO):
+ """
+ BytesIO that does not close on close().
+
+ When a stream wrapping a a file-like object is closed, the underlying
+ file-like object is also closed. This subclass prevents that from
+ happening by overriding the close method.
+
+ If we close a RecordBatchStreamWriter wrapping a BytesIO object, we want
+ to be able to create a memory view of the buffer. But that is only possible
+ if the BytesIO object is not closed yet.
+ """
+
+ def close(self):
+ pass
+
+ def close_now(self):
+ super().close()
+
+
+class SocketWriterSink(socketserver._SocketWriter):
+ """Wrapper to make wfile usable as a sink for Arrow stream writing."""
+
+ def __init__(self, wfile):
+ self.wfile = wfile
+
+ def writable(self):
+ return True
+
+ def write(self, b):
+ self.wfile.write(b)
+
+ def fileno(self):
+ return self._sock.fileno()
+
+ def close(self):
+ """Do nothing so Arrow stream wrappers don't close the socket."""
+ pass
+
+
+def generate_chunk_buffers(schema, source, compression):
+ # the sink holds the buffer and we give a view of it to the caller
+ with LateClosingBytesIO() as sink:
+ # keep buffering until we have at least MIN_BUFFER_SIZE bytes
+ # in the buffer before yielding it to the caller. Setting it
+ # to 1 means we yield as soon as the compression blocks are
+ # formed and reach the sink buffer.
+ MIN_BUFFER_SIZE = 64 * 1024
+ if compression.startswith("identity"):
+ if compression == "identity+zstd":
+ options = pa.ipc.IpcWriteOptions(compression="zstd")
+ elif compression == "identity+lz4":
+ options = pa.ipc.IpcWriteOptions(compression="lz4")
+ else:
+ options = None
+ # source: RecordBatchReader
+ # |> writer: RecordBatchStreamWriter
+ # |> sink: LateClosingBytesIO
+ writer = pa.ipc.new_stream(sink, schema, options=options)
+ for batch in source:
+ writer.write_batch(batch)
+ if sink.tell() >= MIN_BUFFER_SIZE:
+ sink.truncate()
+ with sink.getbuffer() as buffer:
+ yield buffer
+ sink.seek(0)
+
+ writer.close() # write EOS marker and flush
+ else:
+ compression = "brotli" if compression == "br" else compression
+ with pa.CompressedOutputStream(sink, compression) as compressor:
+ # has the first buffer been yielded already?
+ sent_first = False
+ # source: RecordBatchReader
+ # |> writer: RecordBatchStreamWriter
+ # |> compressor: CompressedOutputStream
+ # |> sink: LateClosingBytesIO
+ writer = pa.ipc.new_stream(compressor, schema)
+ for batch in source:
+ writer.write_batch(batch)
+ # we try to yield a buffer ASAP no matter how small
+ if not sent_first and sink.tell() == 0:
+ compressor.flush()
+ pos = sink.tell()
+ if pos >= MIN_BUFFER_SIZE or (not sent_first and pos >= 1):
+ sink.truncate()
+ with sink.getbuffer() as buffer:
+ yield buffer
+ sink.seek(0)
+ sent_first = True
+
+ writer.close() # write EOS marker and flush
+ compressor.close()
+
+ sink.truncate()
+ with sink.getbuffer() as buffer:
+ yield buffer
+ sink.close_now()
+
+
+AVAILABLE_IPC_CODECS = ["zstd", "lz4"]
+"""List of available codecs Arrow IPC buffer compression."""
+
+AVAILABLE_CODINGS = ["zstd", "br", "gzip"]
+"""
+List of available content-codings as used in HTTP.
+
+Note that Arrow stream classes refer to Brotli as "brotli" and not "br".
+"""
+
+
+class MyRequestHandler(BaseHTTPRequestHandler):
+ """
+ Response handler for a simple HTTP server.
+
+ This HTTP request handler serves a compressed HTTP response with an Arrow
+ stream in it or a (TODO) compressed Arrow stream in a uncompressed HTTP
+ response.
+
+ The Arrow data is randomly generated "trading data" with a schema
consisting
+ of a ticker, price (in cents), and volume.
+ """
+
+ def _resolve_batches(self):
+ return pa.RecordBatchReader.from_batches(the_schema, all_batches)
+
+ def _send_not_acceptable(self, parsing_error=None):
+ self.send_response(406, "Not Acceptable")
+ self.send_header("Content-Type", "text/plain")
+ self.end_headers()
+ if parsing_error:
+ message = f"Error parsing header: {parsing_error}\n"
+ else:
+ message = "None of the available codings are accepted by this
client.\n"
+ accept = self.headers.get("Accept")
+ if accept is not None:
+ message += f"`Accept` header was {accept!r}.\n"
+ accept_encoding = self.headers.get("Accept-Encoding")
+ if accept_encoding is not None:
+ message += f"`Accept-Encoding` header was {accept_encoding!r}.\n"
+ self.wfile.write(bytes(message, "utf-8"))
+
+ def do_GET(self):
+ # HTTP/1.0 requests don't get chunked responses
+ if self.request_version == "HTTP/1.0":
+ self.protocol_version = "HTTP/1.0"
+ chunked = False
+ else:
+ self.protocol_version = "HTTP/1.1"
+ chunked = True
+
+ # if client's intent cannot be derived from the headers, return
+ # uncompressed data for HTTP/1.0 requests and compressed data for
+ # HTTP/1.1 requests with the safest compression format choice: "gzip".
+ default_compression = (
+ "identity"
+ if self.request_version == "HTTP/1.0" or ("gzip" not in
AVAILABLE_CODINGS)
+ else "gzip"
+ )
+ try:
+ compression = pick_compression(
+ self.headers,
+ AVAILABLE_IPC_CODECS,
+ AVAILABLE_CODINGS,
+ default_compression,
+ )
+ if compression is None:
+ self._send_not_acceptable()
+ return
+ except ValueError as e:
+ self._send_not_acceptable(str(e))
+ return
+
+ ### in a real application the data would be resolved from a database or
+ ### another source like a file and error handling would be done here
+ ### before the 200 OK response starts being sent to the client.
+ source = self._resolve_batches()
+
+ self.send_response(200)
+ ### set these headers if testing with a local browser-based client:
+ # self.send_header('Access-Control-Allow-Origin',
'http://localhost:8008')
+ # self.send_header('Access-Control-Allow-Methods', 'GET')
+ # self.send_header('Access-Control-Allow-Headers', 'Content-Type')
+ self.send_header(
+ "Content-Type",
+ (
+ f"{ARROW_STREAM_FORMAT}; codecs={compression[9:]}"
+ if compression.startswith("identity+")
+ else ARROW_STREAM_FORMAT
+ ),
+ )
+ # suggest a default filename in case this response is saved by the user
+ self.send_header("Content-Disposition", r'attachment;
filename="output.arrows"')
+
+ if not compression.startswith("identity"):
+ self.send_header("Content-Encoding", compression)
+ if chunked:
+ self.send_header("Transfer-Encoding", "chunked")
+ self.end_headers()
+ for buffer in generate_chunk_buffers(the_schema, source,
compression):
+ self.wfile.write(f"{len(buffer):X}\r\n".encode("utf-8"))
+ self.wfile.write(buffer)
+ self.wfile.write("\r\n".encode("utf-8"))
+ self.wfile.write("0\r\n\r\n".encode("utf-8"))
+ else:
+ self.end_headers()
+ sink = SocketWriterSink(self.wfile)
+ for buffer in generate_chunk_buffers(the_schema, source,
compression):
+ sink.write(buffer)
+
+
+print("Generating example data...")
+
+all_tickers = example_tickers(60)
+all_batches = example_batches(all_tickers)
+
+server_address = ("localhost", 8008)
+try:
+ httpd = HTTPServer(server_address, MyRequestHandler)
+ print(f"Serving on {server_address[0]}:{server_address[1]}...")
+ httpd.serve_forever()
+except KeyboardInterrupt:
+ print("Shutting down server")
+ httpd.socket.close()
diff --git a/http/get_simple/python/client/urllib.request/client.py
b/http/get_simple/python/client/urllib.request/client.py
index 1d5d198..a2f24de 100644
--- a/http/get_simple/python/client/urllib.request/client.py
+++ b/http/get_simple/python/client/urllib.request/client.py
@@ -25,7 +25,7 @@ start_time = time.time()
response = urllib.request.urlopen('http://localhost:8008')
content_type = response.headers['Content-Type']
-if content_type != ARROW_STREAM_FORMAT:
+if not content_type.startswith(ARROW_STREAM_FORMAT):
raise ValueError(f"Expected {ARROW_STREAM_FORMAT}, got {content_type}")
batches = []