felipecrv commented on code in PR #35:
URL: https://github.com/apache/arrow-experiments/pull/35#discussion_r1797397257


##########
http/get_compressed/python/server/server.py:
##########
@@ -0,0 +1,564 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from random import choice, randint
+from http.server import BaseHTTPRequestHandler, HTTPServer
+import io
+import pyarrow as pa
+import pyarrow.compute as pc
+import re
+import socketserver
+import string
+
+# use dictionary encoding for the ticker column
+USE_DICTIONARY_ENCODING = True
+
+
+def random_string(alphabet, length):
+    return "".join(choice(alphabet) for _ in range(length))
+
+
+def random_name(initial):
+    length = randint(3, 7)
+    return initial + random_string(string.ascii_lowercase, length)
+
+
+def example_tickers(num_tickers):
+    tickers = []
+    while len(tickers) < num_tickers:
+        length = randint(3, 4)
+        random_ticker = random_string(string.ascii_uppercase, length)
+        if random_ticker not in tickers:
+            tickers.append(random_ticker)
+    return tickers
+
+
+the_ticker_type = (
+    pa.dictionary(pa.int32(), pa.utf8()) if USE_DICTIONARY_ENCODING else 
pa.utf8()
+)
+the_schema = pa.schema(
+    [
+        ("ticker", the_ticker_type),
+        ("price", pa.int64()),
+        ("volume", pa.int64()),
+    ]
+)
+
+
+def example_batch(tickers, length):
+    ticker_indices = []
+    price = []
+    volume = []
+    for _ in range(length):
+        ticker_indices.append(randint(0, len(tickers) - 1))
+        price.append(randint(1, 1000) * 100)
+        volume.append(randint(1, 10000))
+    ticker = (
+        pa.DictionaryArray.from_arrays(ticker_indices, tickers)
+        if USE_DICTIONARY_ENCODING
+        else pc.take(tickers, ticker_indices, boundscheck=False)
+    )
+    return pa.RecordBatch.from_arrays([ticker, price, volume], 
schema=the_schema)
+
+
+def example_batches(tickers):
+    # these parameters are chosen to generate a response
+    # of ~1 GB and chunks of ~140 KB (uncompressed)
+    total_records = 42_000_000
+    batch_len = 6 * 1024
+    # all the batches sent are random slices of the larger base batch
+    base_batch = example_batch(tickers, length=8 * batch_len)
+    batches = []
+    records = 0
+    while records < total_records:
+        length = min(batch_len, total_records - records)
+        offset = randint(0, base_batch.num_rows - length - 1)
+        batch = base_batch.slice(offset, length)
+        batches.append(batch)
+        records += length
+    return batches
+
+
+# end of example data generation
+
+# what the HTTP spec calls a token (any character except CTLs or separators)
+TOKEN_RE = r"(?:[A-Za-z0-9!#$%&'*+./^_`|~-]+)"
+# [L]inear [W]hite [S]pace pattern (HTTP/1.1 - RFC 2616)
+LWS_RE = r"(?:[ \t]|\r\n[ \t]+)*"
+TOKENIZER_PAT = re.compile(
+    f"(?P<TOK>{TOKEN_RE})"
+    r'|(?P<QUOTED>"([^"\\]|\\.)*")'  # a quoted string (escaped pairs allowed)
+    r"|(?P<COMMA>,)"
+    r"|(?P<SEMI>;)"
+    r"|(?P<EQ>=)"
+    f"|(?P<SKIP>{LWS_RE})"  # LWS is skipped
+    r"|(?P<MISMATCH>.+)",
+    flags=re.ASCII,  # HTTP headers are encoded in ASCII
+)
+
+
+def parse_header_value(header_name, header_value):
+    """
+    Parse the Accept or Accept-Encoding request header values.
+
+    Returns
+    -------
+    list of (str, dict)
+        The list of lowercase tokens and their parameters in the order they
+        appear in the header. The parameters are stored in a dictionary where
+        the keys are the parameter names and the values are the parameter
+        values. If a parameter is not followed by an equal sign and a value,
+        the value is None.
+    """
+
+    def unexpected(label, value):
+        msg = f"Malformed {header_name} header: unexpected {label} at 
{value!r}"
+        return ValueError(msg)
+
+    def tokenize():
+        for mo in re.finditer(TOKENIZER_PAT, header_value):
+            kind = mo.lastgroup
+            if kind == "SKIP":
+                continue
+            elif kind == "MISMATCH":
+                raise unexpected("character", mo.group())
+            yield (kind, mo.group())
+
+    tokens = tokenize()
+
+    def expect(expected_kind):
+        kind, text = next(tokens)
+        if kind != expected_kind:
+            raise unexpected("token", text)
+        return text
+
+    accepted = []
+    while True:
+        try:
+            name, params = None, {}
+            name = expect("TOK").lower()
+            kind, text = next(tokens)
+            while True:
+                if kind == "COMMA":
+                    accepted.append((name, params))
+                    break
+                if kind == "SEMI":
+                    ident = expect("TOK")
+                    params[ident] = None  # init param to None
+                    kind, text = next(tokens)
+                    if kind != "EQ":
+                        continue
+                    kind, text = next(tokens)
+                    if kind in ["TOK", "QUOTED"]:
+                        if kind == "QUOTED":
+                            text = text[1:-1]  # remove the quotes
+                        params[ident] = text  # set param to value
+                        kind, text = next(tokens)
+                        continue
+                raise unexpected("token", text)
+        except StopIteration:
+            break
+    if name is not None:
+        # any unfinished ;param=value sequence or trailing separators are 
ignored
+        accepted.append((name, params))
+    return accepted
+
+
+ARROW_STREAM_FORMAT = "application/vnd.apache.arrow.stream"
+
+
+def pick_ipc_codec(accept_header, available, default):
+    """
+    Pick the IPC stream codec according to the Accept header.
+
+    This is used when deciding which codec to use for compression of IPC buffer
+    streams. This is a feature of the Arrow IPC stream format and is different
+    from the HTTP content-coding used to compress the entire HTTP response.
+
+    This is how a client may specify the IPC buffer compression codecs it
+    accepts:
+
+        Accept: application/vnd.apache.arrow.ipc; codecs="zstd, lz4"
+
+    Parameters
+    ----------
+    accept_header : str|None
+        The value of the Accept header from an HTTP request.
+    available : list of str
+        The codecs that the server can provide in the order preferred by the
+        server. Example: ["zstd", "lz4"].
+    default : str|None
+        The codec to use if the client does not specify the ";codecs" parameter
+        in the Accept header.
+
+    Returns
+    -------
+    str|None
+        The codec that the server should use to compress the IPC buffer stream.
+        None if the client does not accept any of the available codecs
+        explicitly listed. ;codecs="" means no codecs are accepted.
+        If the client does not specify the codecs parameter, the default codec
+        is returned.
+    """
+    did_specify_codecs = False
+    accepted_codecs = []
+    if accept_header is not None:
+        accepted = parse_header_value("Accept", accept_header)
+        for media_range, params in accepted:
+            if (
+                media_range == "*/*"
+                or media_range == "application/*"
+                or media_range == ARROW_STREAM_FORMAT
+            ):
+                did_specify_codecs = "codecs" in params
+                codecs_str = params.get("codecs")
+                if codecs_str is None:
+                    continue
+                for codec in codecs_str.split(","):
+                    accepted_codecs.append(codec.strip())
+
+    for codec in available:
+        if codec in accepted_codecs:
+            return codec
+    return None if did_specify_codecs else default
+
+
+def pick_coding(accept_encoding_header, available):
+    """
+    Pick the content-coding according to the Accept-Encoding header.
+
+    This is used when using HTTP response compression instead of IPC buffer
+    compression.
+
+    Parameters
+    ----------
+    accept_encoding_header : str
+        The value of the Accept-Encoding header from an HTTP request.
+    available : list of str
+        The content-codings that the server can provide in the order preferred
+        by the server. Example: ["zstd", "br", "gzip"].
+
+    Returns
+    -------
+    str
+        The content-coding that the server should use to compress the response.
+        "identity" is returned if no acceptable content-coding is found in the
+        list of available codings.
+
+        None if the client does not accept any of the available content-codings
+        and doesn't accept "identity" (uncompressed) either. In this case,
+        a "406 Not Acceptable" response should be sent.
+    """
+    accepted = parse_header_value("Accept-Encoding", accept_encoding_header)
+
+    def qvalue_or(params, default):
+        qvalue = params.get("q")
+        if qvalue is not None:
+            try:
+                return float(qvalue)
+            except ValueError:
+                raise ValueError(f"Invalid qvalue in Accept-Encoding header: 
{qvalue}")
+        return default
+
+    if "identity" not in available:
+        available = available + ["identity"]
+    state = {}
+    for coding, params in accepted:
+        qvalue = qvalue_or(params, 0.0001 if coding == "identity" else 1.0)
+        if coding == "*":
+            for coding in available:
+                if coding not in state:
+                    state[coding] = qvalue
+        elif coding in available:
+            state[coding] = qvalue
+    # "identity" is always acceptable unless explicitly refused (;q=0)
+    if "identity" not in state:
+        state["identity"] = 0.0001
+    # all the candidate codings are now in the state dictionary and we
+    # have to consider only the ones that have the maximum qvalue
+    max_qvalue = max(state.values())
+    if max_qvalue == 0.0:
+        return None
+    for coding in available:
+        if coding in state and state[coding] == max_qvalue:
+            return coding
+    return None
+
+
+def pick_compression(headers, available_ipc_codecs, available_codings, 
default):
+    """
+    Pick the compression strategy based on the Accept and Accept-Encoding 
headers.
+
+    Parameters
+    ----------
+    headers : dict
+        The HTTP request headers.
+    available_ipc_codecs : list of str
+        The codecs that the server can provide for IPC buffer compression.
+    available_codings : list of str
+        The content-codings that the server can provide for HTTP response
+        compression.
+    default : str
+        The default compression strategy to use if the client does explicitly
+        choose.
+
+    Returns
+    -------
+    str|None
+        The compression strategy to use. It can be one of the following:
+        "identity": no compression at all.
+        "identity+zstd": No HTTP compression + IPC buffer compression with 
Zstd.
+        "identity+lz4": No HTTP compression + IPC buffer compression with LZ4.
+        "zstd", "br", "gzip", ...: HTTP compression without IPC buffer 
compression.
+        None means a "406 Not Acceptable" response should be sent.
+    """
+    accept = headers.get("Accept")
+    ipc_codec = pick_ipc_codec(accept, available_ipc_codecs, default=None)
+    if ipc_codec is None:
+        accept_encoding = headers.get("Accept-Encoding")
+        return (
+            default
+            if accept_encoding is None
+            else pick_coding(accept_encoding, available_codings)
+        )
+    return "identity+" + ipc_codec
+
+
+class LateClosingBytesIO(io.BytesIO):

Review Comment:
   I’m citing this in the user@ mailing list as a way to get around the fact 
stream closing propagates across the entire chain.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to