ianmcook commented on code in PR #35:
URL: https://github.com/apache/arrow-experiments/pull/35#discussion_r1843886678


##########
http/get_compressed/python/server/server.py:
##########
@@ -0,0 +1,564 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from random import choice, randint
+from http.server import BaseHTTPRequestHandler, HTTPServer
+import io
+import pyarrow as pa
+import pyarrow.compute as pc
+import re
+import socketserver
+import string
+
+# use dictionary encoding for the ticker column
+USE_DICTIONARY_ENCODING = True
+
+
+def random_string(alphabet, length):
+    return "".join(choice(alphabet) for _ in range(length))
+
+
+def random_name(initial):
+    length = randint(3, 7)
+    return initial + random_string(string.ascii_lowercase, length)
+
+
+def example_tickers(num_tickers):
+    tickers = []
+    while len(tickers) < num_tickers:
+        length = randint(3, 4)
+        random_ticker = random_string(string.ascii_uppercase, length)
+        if random_ticker not in tickers:
+            tickers.append(random_ticker)
+    return tickers
+
+
+the_ticker_type = (
+    pa.dictionary(pa.int32(), pa.utf8()) if USE_DICTIONARY_ENCODING else 
pa.utf8()
+)
+the_schema = pa.schema(
+    [
+        ("ticker", the_ticker_type),
+        ("price", pa.int64()),
+        ("volume", pa.int64()),
+    ]
+)
+
+
+def example_batch(tickers, length):
+    ticker_indices = []
+    price = []
+    volume = []
+    for _ in range(length):
+        ticker_indices.append(randint(0, len(tickers) - 1))
+        price.append(randint(1, 1000) * 100)
+        volume.append(randint(1, 10000))
+    ticker = (
+        pa.DictionaryArray.from_arrays(ticker_indices, tickers)
+        if USE_DICTIONARY_ENCODING
+        else pc.take(tickers, ticker_indices, boundscheck=False)
+    )
+    return pa.RecordBatch.from_arrays([ticker, price, volume], 
schema=the_schema)
+
+
+def example_batches(tickers):
+    # these parameters are chosen to generate a response
+    # of ~1 GB and chunks of ~140 KB (uncompressed)
+    total_records = 42_000_000
+    batch_len = 6 * 1024
+    # all the batches sent are random slices of the larger base batch
+    base_batch = example_batch(tickers, length=8 * batch_len)
+    batches = []
+    records = 0
+    while records < total_records:
+        length = min(batch_len, total_records - records)
+        offset = randint(0, base_batch.num_rows - length - 1)
+        batch = base_batch.slice(offset, length)
+        batches.append(batch)
+        records += length
+    return batches
+
+
+# end of example data generation
+
+# what the HTTP spec calls a token (any character except CTLs or separators)
+TOKEN_RE = r"(?:[A-Za-z0-9!#$%&'*+./^_`|~-]+)"
+# [L]inear [W]hite [S]pace pattern (HTTP/1.1 - RFC 2616)
+LWS_RE = r"(?:[ \t]|\r\n[ \t]+)*"
+TOKENIZER_PAT = re.compile(
+    f"(?P<TOK>{TOKEN_RE})"
+    r'|(?P<QUOTED>"([^"\\]|\\.)*")'  # a quoted string (escaped pairs allowed)
+    r"|(?P<COMMA>,)"
+    r"|(?P<SEMI>;)"
+    r"|(?P<EQ>=)"
+    f"|(?P<SKIP>{LWS_RE})"  # LWS is skipped
+    r"|(?P<MISMATCH>.+)",
+    flags=re.ASCII,  # HTTP headers are encoded in ASCII
+)
+
+
+def parse_header_value(header_name, header_value):
+    """
+    Parse the Accept or Accept-Encoding request header values.
+
+    Returns
+    -------
+    list of (str, dict)
+        The list of lowercase tokens and their parameters in the order they
+        appear in the header. The parameters are stored in a dictionary where
+        the keys are the parameter names and the values are the parameter
+        values. If a parameter is not followed by an equal sign and a value,
+        the value is None.
+    """
+
+    def unexpected(label, value):
+        msg = f"Malformed {header_name} header: unexpected {label} at 
{value!r}"
+        return ValueError(msg)
+
+    def tokenize():
+        for mo in re.finditer(TOKENIZER_PAT, header_value):
+            kind = mo.lastgroup
+            if kind == "SKIP":
+                continue
+            elif kind == "MISMATCH":
+                raise unexpected("character", mo.group())
+            yield (kind, mo.group())
+
+    tokens = tokenize()
+
+    def expect(expected_kind):
+        kind, text = next(tokens)
+        if kind != expected_kind:
+            raise unexpected("token", text)
+        return text
+
+    accepted = []
+    while True:
+        try:
+            name, params = None, {}
+            name = expect("TOK").lower()
+            kind, text = next(tokens)
+            while True:
+                if kind == "COMMA":
+                    accepted.append((name, params))
+                    break
+                if kind == "SEMI":
+                    ident = expect("TOK")
+                    params[ident] = None  # init param to None
+                    kind, text = next(tokens)
+                    if kind != "EQ":
+                        continue
+                    kind, text = next(tokens)
+                    if kind in ["TOK", "QUOTED"]:
+                        if kind == "QUOTED":
+                            text = text[1:-1]  # remove the quotes
+                        params[ident] = text  # set param to value
+                        kind, text = next(tokens)
+                        continue
+                raise unexpected("token", text)
+        except StopIteration:
+            break
+    if name is not None:
+        # any unfinished ;param=value sequence or trailing separators are 
ignored
+        accepted.append((name, params))
+    return accepted
+
+
+ARROW_STREAM_FORMAT = "application/vnd.apache.arrow.stream"
+
+
+def pick_ipc_codec(accept_header, available, default):
+    """
+    Pick the IPC stream codec according to the Accept header.
+
+    This is used when deciding which codec to use for compression of IPC buffer
+    streams. This is a feature of the Arrow IPC stream format and is different
+    from the HTTP content-coding used to compress the entire HTTP response.
+
+    This is how a client may specify the IPC buffer compression codecs it
+    accepts:
+
+        Accept: application/vnd.apache.arrow.ipc; codecs="zstd, lz4"

Review Comment:
   ```suggestion
           Accept: application/vnd.apache.arrow.stream; codecs="zstd, lz4"
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to