niebayes opened a new issue, #7329:
URL: https://github.com/apache/arrow-rs/issues/7329
**Describe the bug**
<!--
A clear and concise description of what the bug is.
-->
**To Reproduce**
<!--
Steps to reproduce the behavior:
-->
I have written a test to reproduce the bug:
``` rust
#[cfg(test)]
mod tests {
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::Arc;
use arrow::array::{Int32Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use arrow_flight::decode::FlightRecordBatchStream;
use arrow_flight::error::FlightError;
use arrow_flight::sql::CommandStatementIngest;
use arrow_flight::sql::SqlInfo;
use arrow_flight::sql::client::FlightSqlServiceClient;
use arrow_flight::sql::server::PeekableFlightDataStream;
use arrow_flight::{flight_service_server::FlightServiceServer,
sql::server::FlightSqlService};
use futures::TryStreamExt;
use tokio::sync::oneshot;
use tonic::transport::Endpoint;
use tonic::{Request, Status, transport::Server};
#[derive(Clone)]
struct DummyFlightSqlServer;
#[tonic::async_trait]
impl FlightSqlService for DummyFlightSqlServer {
type FlightService = DummyFlightSqlServer;
/// Execute a bulk ingestion.
async fn do_put_statement_ingest(
&self,
_ticket: CommandStatementIngest,
request: Request<PeekableFlightDataStream>,
) -> Result<i64, Status> {
let stream = FlightRecordBatchStream::new_from_flight_data(
request.into_inner().map_err(FlightError::from),
);
let batches = stream
.try_collect::<Vec<_>>()
.await
.map_err(|e| Status::unknown(e.to_string()))?;
let affected_rows = batches.iter().map(|b|
b.num_rows()).sum::<usize>();
Ok(affected_rows as i64)
}
async fn register_sql_info(&self, _id: i32, _result: &SqlInfo) {}
}
#[tokio::test]
async fn test_flight_sql_ingest() {
// 1. Create a dummy Flight SQL server
let server = DummyFlightSqlServer {};
// 2. Start the server on port 4000
let addr: SocketAddr = "127.0.0.1:4000".parse().unwrap();
let (tx, rx) = oneshot::channel::<()>();
let server_handle = tokio::spawn(async move {
Server::builder()
.add_service(FlightServiceServer::new(server))
.serve_with_shutdown(addr, async {
rx.await.ok();
})
.await
.unwrap();
});
// Wait for the server to start
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
// 3. Create a Flight SQL client and connect to the server
let channel = Endpoint::from_str("http://127.0.0.1:4000")
.unwrap()
.connect()
.await
.unwrap();
let mut client = FlightSqlServiceClient::new(channel);
// 4. Prepare data for ingestion
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])),
],
)
.unwrap();
// 5. Execute ingestion
let stream = futures::stream::iter(vec![Ok(batch)]);
let affected_rows = client
.execute_ingest(
CommandStatementIngest {
catalog: None,
schema: None,
table: "t".into(),
..Default::default()
},
stream,
)
.await
.unwrap();
assert_eq!(affected_rows, 3);
// 6. Execute ingestion with a fallible stream.
let stream =
futures::stream::iter(vec![Err(FlightError::ProtocolError("error".into()))]);
let result = client
.execute_ingest(
CommandStatementIngest {
catalog: None,
schema: None,
table: "t".into(),
..Default::default()
},
stream,
)
.await;
if let Err(e) = result {
println!("ingest error: {}", e);
}
// 7. Execute ingestion with an empty stream.
let stream = futures::stream::empty();
let result = client
.execute_ingest(
CommandStatementIngest {
catalog: None,
schema: None,
table: "t".into(),
..Default::default()
},
stream,
)
.await;
if let Err(e) = result {
println!("ingest error: {}", e);
}
// Shutdown the server
tx.send(()).unwrap();
server_handle.await.unwrap();
}
}
```
**Expected behavior**
<!--
A clear and concise description of what you expected to happen.
-->
**Additional context**
<!--
Add any other context about the problem here.
-->
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]