This is an automated email from the ASF dual-hosted git repository.

ianmcook pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-experiments.git


The following commit(s) were added to refs/heads/main by this push:
     new bbbfaf6  Add Rust server and client for `get_simple` example (#8)
bbbfaf6 is described below

commit bbbfaf69d01d423758280e88cd69c7b4f9e03abc
Author: Matthijs Brobbel <[email protected]>
AuthorDate: Mon Mar 11 20:16:14 2024 +0100

    Add Rust server and client for `get_simple` example (#8)
    
    * Add Rust server and client for `get_simple` example
    
    * Remove and ignore `Cargo.lock`
    
    * Fix get request in client
    
    * Wrap span around the full example
    
    * Re-use the `BufReader`
    
    * Handle chunked transfer encoding in client
    
    * Apply suggestions from code review
    
    Co-authored-by: Ian Cook <[email protected]>
    
    ---------
    
    Co-authored-by: Ian Cook <[email protected]>
---
 http/get_simple/rs/.gitignore         |  19 +++++
 http/get_simple/rs/Cargo.toml         |  27 +++++++
 http/get_simple/rs/client/Cargo.toml  |  26 +++++++
 http/get_simple/rs/client/README.md   |  34 +++++++++
 http/get_simple/rs/client/src/main.rs | 106 ++++++++++++++++++++++++++
 http/get_simple/rs/server/Cargo.toml  |  31 ++++++++
 http/get_simple/rs/server/README.md   |  34 +++++++++
 http/get_simple/rs/server/src/main.rs | 135 ++++++++++++++++++++++++++++++++++
 8 files changed, 412 insertions(+)

diff --git a/http/get_simple/rs/.gitignore b/http/get_simple/rs/.gitignore
new file mode 100644
index 0000000..95a3a86
--- /dev/null
+++ b/http/get_simple/rs/.gitignore
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+/target
+Cargo.lock
diff --git a/http/get_simple/rs/Cargo.toml b/http/get_simple/rs/Cargo.toml
new file mode 100644
index 0000000..f8670a8
--- /dev/null
+++ b/http/get_simple/rs/Cargo.toml
@@ -0,0 +1,27 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[workspace]
+resolver = "2"
+members = ["client", "server"]
+
+[workspace.dependencies]
+arrow-array = "50.0.0"
+arrow-ipc = "50.0.0"
+arrow-schema = "50.0.0"
+tracing = "0.1.40"
+tracing-subscriber = "0.3.18"
diff --git a/http/get_simple/rs/client/Cargo.toml 
b/http/get_simple/rs/client/Cargo.toml
new file mode 100644
index 0000000..1eec1bd
--- /dev/null
+++ b/http/get_simple/rs/client/Cargo.toml
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "client"
+version = "0.1.0"
+edition = "2021"
+
+[dependencies]
+arrow-ipc.workspace = true
+tracing.workspace = true
+tracing-subscriber.workspace = true
diff --git a/http/get_simple/rs/client/README.md 
b/http/get_simple/rs/client/README.md
new file mode 100644
index 0000000..caeeab1
--- /dev/null
+++ b/http/get_simple/rs/client/README.md
@@ -0,0 +1,34 @@
+<!---
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+
+# HTTP GET Arrow Data: Simple Rust Client Example
+
+This directory contains a minimal example of an HTTP client implemented in 
Rust. The client:
+
+1. Sends an HTTP GET request to a server.
+2. Receives an HTTP 200 response from the server, with the response body 
containing an Arrow IPC stream of record batches.
+3. Adds the record batches to a list as they are received.
+
+To run this example, first start one of the server examples in the parent 
directory, then:
+
+```sh
+cargo r --release
+```
+> [!NOTE]  
+> This client example implements low-level HTTP/1.1 details directly, instead 
of using an HTTP library. We intend to update the example to use 
[hyper](https://docs.rs/hyper/latest/hyper/) after [arrow-rs has an async Arrow 
IPC reader](https://github.com/apache/arrow-rs/issues/1207)).
diff --git a/http/get_simple/rs/client/src/main.rs 
b/http/get_simple/rs/client/src/main.rs
new file mode 100644
index 0000000..7e593b4
--- /dev/null
+++ b/http/get_simple/rs/client/src/main.rs
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow_ipc::reader::StreamReader;
+use std::{
+    io::{BufRead, BufReader, Read, Write},
+    net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream},
+};
+use tracing::{error, info, info_span};
+use tracing_subscriber::fmt::format::FmtSpan;
+
+fn main() {
+    // Configure tracing subscriber.
+    tracing_subscriber::fmt()
+        .with_span_events(FmtSpan::CLOSE)
+        .init();
+
+    info_span!("get_simple").in_scope(|| {
+        // Connect to server.
+        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8000);
+        match TcpStream::connect(addr) {
+            Ok(mut stream) => {
+                info_span!("Reading Arrow IPC stream", %addr).in_scope(|| {
+                    info!("Connected");
+
+                    // Send request.
+                    stream
+                        .write_all(format!("GET / HTTP/1.1\r\nHost: 
{addr}\r\n\r\n").as_bytes())
+                        .unwrap();
+
+                    // Ignore response header.
+                    let mut reader = BufReader::new(&mut stream);
+                    let mut chunked = false;
+                    loop {
+                        let mut line = String::default();
+                        reader.read_line(&mut line).unwrap();
+                        if let Some(("transfer-encoding", "chunked")) = line
+                            .to_lowercase()
+                            .split_once(':')
+                            .map(|(key, value)| (key.trim(), value.trim()))
+                        {
+                            chunked = true;
+                        }
+                        if line == "\r\n" {
+                            break;
+                        }
+                    }
+
+                    // Read Arrow IPC stream
+                    let batches: Vec<_> = if chunked {
+                        let mut buffer = Vec::default();
+                        loop {
+                            // Chunk size
+                            let mut line = String::default();
+                            reader.read_line(&mut line).unwrap();
+                            let chunk_size = u64::from_str_radix(line.trim(), 
16).unwrap();
+
+                            if chunk_size == 0 {
+                                // Terminating chunk
+                                break;
+                            } else {
+                                // Append chunk to buffer
+                                let mut chunk_reader = reader.take(chunk_size);
+                                chunk_reader.read_to_end(&mut buffer).unwrap();
+                                // Terminating CR-LF sequence
+                                reader = chunk_reader.into_inner();
+                                reader.read_line(&mut 
String::default()).unwrap();
+                            }
+                        }
+                        StreamReader::try_new_unbuffered(buffer.as_slice(), 
None)
+                            .unwrap()
+                            .flat_map(Result::ok)
+                            .collect()
+                    } else {
+                        StreamReader::try_new_unbuffered(reader, None)
+                            .unwrap()
+                            .flat_map(Result::ok)
+                            .collect()
+                    };
+
+                    info!(
+                        batches = batches.len(),
+                        rows = batches.iter().map(|rb| 
rb.num_rows()).sum::<usize>()
+                    );
+                });
+            }
+            Err(error) => {
+                error!(%error, "Connection failed")
+            }
+        }
+    })
+}
diff --git a/http/get_simple/rs/server/Cargo.toml 
b/http/get_simple/rs/server/Cargo.toml
new file mode 100644
index 0000000..f961a4c
--- /dev/null
+++ b/http/get_simple/rs/server/Cargo.toml
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "server"
+version = "0.1.0"
+edition = "2021"
+
+[dependencies]
+arrow-array.workspace = true
+arrow-ipc.workspace = true
+arrow-schema.workspace = true
+once_cell = "1.19.0"
+rand = "0.8.5"
+rayon = "1.9.0"
+tracing.workspace = true
+tracing-subscriber.workspace = true
diff --git a/http/get_simple/rs/server/README.md 
b/http/get_simple/rs/server/README.md
new file mode 100644
index 0000000..70ac8f7
--- /dev/null
+++ b/http/get_simple/rs/server/README.md
@@ -0,0 +1,34 @@
+<!---
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+
+# HTTP GET Arrow Data: Simple Rust Server Example
+
+This directory contains a minimal example of an HTTP server implemented in 
Rust. The server:
+
+1. Creates a list of record batches and populates it with synthesized data.
+2. Listens for HTTP requests from clients.
+3. Upon receiving a request, sends an HTTP 200 response with the body 
containing an Arrow IPC stream of record batches.
+
+To run this example:
+
+```sh
+cargo r --release
+```
+> [!NOTE]  
+> This server example implements low-level HTTP/1.1 details directly, instead 
of using an HTTP library. We intend to update the example to use 
[hyper](https://docs.rs/hyper/latest/hyper/) after [arrow-rs has an async Arrow 
IPC writer](https://github.com/apache/arrow-rs/issues/1207)).
diff --git a/http/get_simple/rs/server/src/main.rs 
b/http/get_simple/rs/server/src/main.rs
new file mode 100644
index 0000000..be3d981
--- /dev/null
+++ b/http/get_simple/rs/server/src/main.rs
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+    io::{BufRead, BufReader, Result, Write},
+    net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener},
+    sync::Arc,
+    thread,
+};
+
+use arrow_array::{Int64Array, RecordBatch};
+use arrow_ipc::writer::StreamWriter;
+use arrow_schema::{DataType, Field, Fields, Schema};
+use once_cell::sync::Lazy;
+use rand::{distributions::Standard, prelude::*};
+use rayon::{iter, prelude::*};
+use tracing::{error, info, info_span};
+use tracing_subscriber::fmt::format::FmtSpan;
+
+const RECORDS_PER_BATCH: usize = 4096;
+const TOTAL_RECORDS: usize = if cfg!(debug_assertions) {
+    100_000
+} else {
+    100_000_000
+};
+
+/// Schema for random data
+static SCHEMA: Lazy<Arc<Schema>> = Lazy::new(|| {
+    Arc::new(Schema::new(
+        ('a'..='d')
+            .map(|field_name| Field::new(field_name, DataType::Int64, true))
+            .collect::<Fields>(),
+    ))
+});
+
+/// Random data
+static DATA: Lazy<Vec<RecordBatch>> = Lazy::new(|| {
+    info_span!("data", TOTAL_RECORDS, RECORDS_PER_BATCH).in_scope(|| {
+        info!("Generating random data");
+        // Generate recordbatches with random data
+        iter::repeatn(
+            RECORDS_PER_BATCH,
+            TOTAL_RECORDS.div_euclid(RECORDS_PER_BATCH),
+        )
+        .chain(iter::once(TOTAL_RECORDS.rem_euclid(RECORDS_PER_BATCH)))
+        .map_init(rand::thread_rng, |rng, len| {
+            RecordBatch::try_new(
+                Arc::clone(&SCHEMA),
+                (0..SCHEMA.all_fields().len())
+                    .map(|_| {
+                        Arc::new(
+                            rng.sample_iter::<i64, Standard>(Standard)
+                                .take(len)
+                                .collect::<Int64Array>(),
+                        ) as _
+                    })
+                    .collect(),
+            )
+        })
+        .flatten()
+        .collect()
+    })
+});
+
+fn get_simple(mut stream: std::net::TcpStream) {
+    info!("Incoming connection");
+
+    // Ignore incoming request.
+    for _ in BufReader::new(&mut stream)
+        .lines()
+        .take_while(|line| line.as_ref().is_ok_and(|line| !line.is_empty()))
+    {}
+
+    // Write response header.
+    stream
+        .write_all(
+            "HTTP/1.1 200 OK\r\ncontent-type: 
application/vnd.apache.arrow.stream\r\n\r\n"
+                .as_bytes(),
+        )
+        .unwrap();
+
+    // Stream the body.
+    let mut writer = StreamWriter::try_new(stream, &SCHEMA).unwrap();
+    for batch in DATA.iter() {
+        writer.write(batch).unwrap();
+    }
+    writer.finish().unwrap();
+
+    let stream = writer.into_inner().unwrap();
+    stream.shutdown(std::net::Shutdown::Both).unwrap();
+}
+
+fn main() -> Result<()> {
+    // Configure tracing subscriber.
+    tracing_subscriber::fmt()
+        .with_span_events(FmtSpan::CLOSE)
+        .init();
+
+    // Generate random data.
+    let _ = Lazy::force(&DATA);
+
+    // Start listening.
+    let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8000);
+    let listener = TcpListener::bind(bind_addr)?;
+    info!(%bind_addr, "Listening");
+
+    // Handle incoming connections.
+    loop {
+        match listener.accept() {
+            Ok((stream, remote_peer)) => {
+                thread::spawn(move || {
+                    info_span!("Writing Arrow IPC stream", %remote_peer)
+                        .in_scope(|| get_simple(stream))
+                });
+            }
+            Err(error) => {
+                error!(%error, "Connection failed");
+            }
+        }
+    }
+}

Reply via email to