alamb commented on code in PR #6201: URL: https://github.com/apache/arrow-rs/pull/6201#discussion_r1715879125
########## arrow-flight/src/sql/client.rs: ########## @@ -227,6 +227,35 @@ impl FlightSqlServiceClient<Channel> { Ok(result.record_count) } + /// Execute a bulk ingest on the server and return the number of records added + pub async fn execute_ingest( + &mut self, + command: CommandStatementIngest, + batches: Vec<RecordBatch>, Review Comment: I see two potential problems with this implementation: 1. It requires the client to buffer the entire dataset into memory 2. It is not consistent with other APIs that send `RecordBatch`es to the server, such as `do_put` ```rust /// Push a stream to the flight service associated with a particular flight stream. pub async fn do_put( &mut self, request: impl tonic::IntoStreamingRequest<Message = FlightData>, ) -> Result<Streaming<PutResult>, ArrowError> { ``` I think it is non obvious that this craziness means "RecordBatchStream": ```rust request: impl tonic::IntoStreamingRequest<Message = FlightData>, ``` So in other words, I think it would be better if execute_ingest did the same thing (rather than internally encoding the stream ########## arrow-flight/tests/flight_sql_client.rs: ########## @@ -63,12 +69,49 @@ pub async fn test_begin_end_transaction() { .is_err()); } +#[tokio::test] +pub async fn test_execute_ingest() { + let test_server = FlightSqlServiceImpl::new(); + let fixture = TestFixture::new(test_server.service()).await; + let channel = fixture.channel().await; + let mut flight_sql_client = FlightSqlServiceClient::new(channel); + let cmd = CommandStatementIngest { + table_definition_options: Some(TableDefinitionOptions { + if_not_exist: TableNotExistOption::Create.into(), + if_exists: TableExistsOption::Fail.into(), + }), + table: String::from("test"), + schema: None, + catalog: None, + temporary: true, + transaction_id: None, + options: HashMap::default(), + }; + let expected_rows = 10; + let batches = vec![ + make_primitive_batch(5), + make_primitive_batch(3), + make_primitive_batch(2), + ]; + let actual_rows = flight_sql_client + .execute_ingest(cmd, batches) + .await + .expect("ingest should succeed"); + assert_eq!(actual_rows, expected_rows); +} + #[derive(Clone)] pub struct FlightSqlServiceImpl { transactions: Arc<Mutex<HashMap<String, ()>>>, } impl FlightSqlServiceImpl { + pub fn new() -> Self { Review Comment: 👍 ########## arrow-flight/src/sql/client.rs: ########## @@ -227,6 +227,35 @@ impl FlightSqlServiceClient<Channel> { Ok(result.record_count) } + /// Execute a bulk ingest on the server and return the number of records added + pub async fn execute_ingest( + &mut self, + command: CommandStatementIngest, + batches: Vec<RecordBatch>, Review Comment: I took a shot at making this change, and here is what I came up with, for your consideration: https://github.com/djanderson/arrow-rs/pull/1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org