alamb commented on code in PR #4797:
URL: https://github.com/apache/arrow-rs/pull/4797#discussion_r1325671631
##########
arrow-flight/tests/flight_sql_client_cli.rs:
##########
@@ -87,10 +91,56 @@ async fn test_simple() {
);
}
+const PREPARED_QUERY: &str = "SELECT * FROM table WHERE field = $1";
+const PREPARED_STATEMENT_HANDLE: &str = "prepared_statement_handle";
+
+#[tokio::test]
+async fn test_do_put_prepared_statement() {
+ let test_server = FlightSqlServiceImpl {};
+ let fixture = TestFixture::new(&test_server).await;
+ let addr = fixture.addr;
+
+ let stdout = tokio::task::spawn_blocking(move || {
+ Command::cargo_bin("flight_sql_client")
+ .unwrap()
+ .env_clear()
+ .env("RUST_BACKTRACE", "1")
+ .env("RUST_LOG", "warn")
+ .arg("--host")
+ .arg(addr.ip().to_string())
+ .arg("--port")
+ .arg(addr.port().to_string())
+ .arg("prepared-statement-query")
+ .arg(PREPARED_QUERY)
+ .args(["-p", "$1=string"])
Review Comment:
💯 for the tests
##########
arrow-flight/src/sql/client.rs:
##########
@@ -492,6 +499,36 @@ impl PreparedStatement<Channel> {
Ok(())
}
+ /// Submit parameters to the server, if any have been set on this prepared
statement instance
+ async fn write_bind_params(&mut self) -> Result<(), ArrowError> {
+ if let Some(ref params_batch) = self.parameter_binding {
+ let cmd = CommandPreparedStatementQuery {
+ prepared_statement_handle: self.handle.clone(),
+ };
+
+ let descriptor =
FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+ let flight_stream_builder = FlightDataEncoderBuilder::new()
+ .with_flight_descriptor(Some(descriptor))
+ .with_schema(params_batch.schema());
+ let flight_data = flight_stream_builder
+ .build(futures::stream::iter(
+ self.parameter_binding.clone().map(Ok),
+ ))
+ .try_collect::<Vec<_>>()
+ .await
+ .map_err(flight_error_to_arrow_error)?;
+
+ self.flight_sql_client
Review Comment:
I filed https://github.com/apache/arrow/issues/37720 and will circulate this
around
##########
arrow-flight/src/sql/server.rs:
##########
@@ -366,7 +366,7 @@ pub trait FlightSqlService: Sync + Send + Sized + 'static {
/// Implementors may override to handle additional calls to do_put()
async fn do_put_fallback(
&self,
- _request: Request<Streaming<FlightData>>,
+ _request: Request<Peekable<Streaming<FlightData>>>,
Review Comment:
This server is the scaffolding to help people build flightsql servers --
they can always use the raw FlightService if they prefer. Thus I think the
change in API is less critical and given the requirements it seems inevitable.
I think the only thing we should try and improve in this PR is improving the
documentation to explain why peekable is used somehow (to lower the cognative
burden on people trying to use this).
One potential option to document this is rather than using
`Peekable<Streaming<...>>` dirctly, would make our own wrapper,
`PeekableStreaming` or something. While this would require duplicate a bunch of
the peekable API, there would be a natural place to document what it was for
and how to use it which I think would lower the barrier to usage.
For example:
```rust
/// A wrapper around `Streaming` that allows inspection of the first
message.
/// This is needed because sometimes the first request in the stream will
contain
/// a [`FlightDescriptor`] in addition to potentially any data and the
dispatch logic
/// must inspect this information.
///
/// # example:
/// <show an example here of calling `into_inner()` to get the original data
back
struct PeekableStreaming {
inner: Peekable<Streaming<FlightData>>
}
impl PeekableStreaming {
/// return the inner stream
pub fn into_inner(self) -> Streaming<FlightData> { self.inner.into_inner()
}
...
}
```
We could also potentially use something like `BoxStream<FlightData>` but
that would lose the gRPC specific stuff like status codes and trailers exposed
by
[`Streaming`](https://docs.rs/tonic/latest/tonic/struct.Streaming.html#method.trailers)
as well as being an API change as well.
Thus I think this design is the best of several less than ideal solutions.
To proceed perhaps we can add some documentation on the `do_*_fallback` methods
that mentions the stream comes from `peekable`
##########
arrow-flight/src/sql/client.rs:
##########
@@ -451,7 +456,9 @@ impl PreparedStatement<Channel> {
/// Executes the prepared statement update query on the server.
pub async fn execute_update(&mut self) -> Result<i64, ArrowError> {
- let cmd = CommandPreparedStatementQuery {
+ self.write_bind_params().await?;
+
+ let cmd = CommandPreparedStatementUpdate {
Review Comment:
I agree - `update` should use `CommandPreparedStatementUpdate`
https://github.com/apache/arrow/blob/15a8ac3ce4e3ac31f9f361770ad4a38c69102aa1/format/FlightSql.proto#L1769-L1780
##########
arrow-flight/tests/flight_sql_client_cli.rs:
##########
@@ -87,10 +91,56 @@ async fn test_simple() {
);
}
+const PREPARED_QUERY: &str = "SELECT * FROM table WHERE field = $1";
+const PREPARED_STATEMENT_HANDLE: &str = "prepared_statement_handle";
+
+#[tokio::test]
+async fn test_do_put_prepared_statement() {
+ let test_server = FlightSqlServiceImpl {};
+ let fixture = TestFixture::new(&test_server).await;
+ let addr = fixture.addr;
+
+ let stdout = tokio::task::spawn_blocking(move || {
+ Command::cargo_bin("flight_sql_client")
+ .unwrap()
+ .env_clear()
+ .env("RUST_BACKTRACE", "1")
+ .env("RUST_LOG", "warn")
+ .arg("--host")
+ .arg(addr.ip().to_string())
+ .arg("--port")
+ .arg(addr.port().to_string())
+ .arg("prepared-statement-query")
+ .arg(PREPARED_QUERY)
+ .args(["-p", "$1=string"])
Review Comment:
💯 for the tests
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]