fresh-borzoni commented on code in PR #399:
URL: https://github.com/apache/fluss-rust/pull/399#discussion_r2871668423
##########
website/docs/user-guide/python/example/configuration.md:
##########
@@ -35,6 +35,7 @@ with await fluss.FlussConnection.create(config) as conn:
| `scanner.remote-log.read-concurrency` | Streaming read concurrency within a
remote log file | `4` |
| `scanner.log.max-poll-records` | Max records returned in a single
poll() | `500` |
| `connect-timeout` | TCP connect timeout in milliseconds
| `120000` |
+| `request-timeout` | Per-request RPC timeout in
milliseconds (also accepts `request-timeout-ms`) | `30000`
|
Review Comment:
api-reference?
##########
bindings/python/src/config.rs:
##########
@@ -113,6 +113,11 @@ impl Config {
FlussError::new_err(format!("Invalid value
'{value}' for '{key}': {e}"))
})?;
}
+ "request-timeout" | "request-timeout-ms" => {
Review Comment:
we decided to support consistent keys with one canonical matching for now,
let's leave `request-timeout` here as it matches Java config convention
##########
crates/fluss/src/rpc/server_connection.rs:
##########
@@ -388,10 +408,36 @@ where
self.send_message(buf).await?;
_cleanup_on_cancel.message_sent();
- let mut response = rx.await.map_err(|e| Error::UnexpectedError {
- message: "Got recvError, some one close the channel".to_string(),
- source: Some(Box::new(e)),
- })??;
+ let mut response = match self.request_timeout {
+ Some(timeout) => match tokio::time::timeout(timeout, rx).await {
+ Ok(result) => result.map_err(|e| Error::UnexpectedError {
+ message: format!(
+ "Response channel closed for request_id={request_id}
api_key={:?}; \
+ connection may be closed or poisoned",
+ R::API_KEY
+ ),
+ source: Some(Box::new(e)),
+ })??,
+ Err(_elapsed) => {
+ if let ConnectionState::RequestMap(map) =
self.state.lock().deref_mut() {
+ map.remove(&request_id);
+ }
+ return Err(RpcError::RequestTimeout {
+ timeout,
+ api_key: R::API_KEY,
+ }
+ .into());
+ }
+ },
+ None => rx.await.map_err(|e| Error::UnexpectedError {
Review Comment:
we have almost the same code in the arms.
Can we improve it as it starts to look clunky?
##########
website/docs/user-guide/python/example/configuration.md:
##########
@@ -35,6 +35,7 @@ with await fluss.FlussConnection.create(config) as conn:
| `scanner.remote-log.read-concurrency` | Streaming read concurrency within a
remote log file | `4` |
| `scanner.log.max-poll-records` | Max records returned in a single
poll() | `500` |
| `connect-timeout` | TCP connect timeout in milliseconds
| `120000` |
+| `request-timeout` | Per-request RPC timeout in
milliseconds (also accepts `request-timeout-ms`) | `30000`
|
Review Comment:
do we need to change docs for other bindings?
##########
crates/fluss/src/client/connection.rs:
##########
@@ -41,17 +41,23 @@ impl FlussConnection {
.map_err(|msg| Error::IllegalArgument { message: msg })?;
let timeout = Duration::from_millis(arg.connect_timeout_ms);
+ let request_timeout = Duration::from_millis(arg.request_timeout_ms);
let connections = if arg.is_sasl_enabled() {
Arc::new(
RpcClient::new()
.with_sasl(
arg.security_sasl_username.clone(),
arg.security_sasl_password.clone(),
)
- .with_timeout(timeout),
+ .with_timeout(timeout)
+ .with_request_timeout(request_timeout),
)
} else {
- Arc::new(RpcClient::new().with_timeout(timeout))
+ Arc::new(
+ RpcClient::new()
+ .with_timeout(timeout)
Review Comment:
shall we change the name here as well?
As we have now two chained timeout builder methods and `with_timeout` is now
confusing
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]