fresh-borzoni commented on code in PR #531:
URL: https://github.com/apache/fluss-rust/pull/531#discussion_r3208664301
##########
crates/fluss/src/rpc/server_connection.rs:
##########
@@ -72,6 +74,89 @@ impl fmt::Debug for SaslConfig {
}
}
+/// Represents the negotiated API versions between the client and a server
node.
+/// Built from the server's `ApiVersionsResponse` by intersecting each API's
+/// client-supported range with the server-supported range, keeping the highest
+/// usable version.
+#[derive(Clone, Debug)]
+pub struct ServerApiVersions {
+ versions: HashMap<ApiKey, Result<ApiVersion, String>>,
+}
+
+impl ServerApiVersions {
+ /// Build from the server's advertised API version list.
+ pub fn new(server_versions: &[PbApiVersion]) -> Self {
+ let mut versions = HashMap::new();
+ for sv in server_versions {
+ let api_key = ApiKey::from(i16::try_from(sv.api_key).unwrap());
Review Comment:
it can potentially panic, Java truncates to short, shall we do smth similar?
##########
crates/fluss/src/rpc/server_connection.rs:
##########
@@ -72,6 +74,89 @@ impl fmt::Debug for SaslConfig {
}
}
+/// Represents the negotiated API versions between the client and a server
node.
+/// Built from the server's `ApiVersionsResponse` by intersecting each API's
+/// client-supported range with the server-supported range, keeping the highest
+/// usable version.
+#[derive(Clone, Debug)]
+pub struct ServerApiVersions {
+ versions: HashMap<ApiKey, Result<ApiVersion, String>>,
+}
+
+impl ServerApiVersions {
+ /// Build from the server's advertised API version list.
+ pub fn new(server_versions: &[PbApiVersion]) -> Self {
+ let mut versions = HashMap::new();
+ for sv in server_versions {
+ let api_key = ApiKey::from(i16::try_from(sv.api_key).unwrap());
+ // Skip unknown API keys — the client does not support them.
+ let client_range = match api_key.supported_versions() {
+ Some(range) => range,
+ None => continue,
+ };
+ let server_min = i16::try_from(sv.min_version).unwrap();
+ let server_max = i16::try_from(sv.max_version).unwrap();
+ let min_version = client_range.min().0.max(server_min);
+ let max_version = client_range.max().0.min(server_max);
+ if min_version > max_version {
+ versions.insert(
+ api_key,
+ Err(format!(
+ "The server does not support {:?} with version in
range [{},{}]. \
+ The supported range is [{},{}].",
+ api_key,
+ client_range.min(),
+ client_range.max(),
+ server_min,
+ server_max,
+ )),
+ );
+ } else {
+ versions.insert(api_key, Ok(ApiVersion(max_version)));
+ }
+ }
+ Self { versions }
+ }
+
+ /// Get the negotiated (highest usable) version for a given API key.
+ pub fn highest_available_version(&self, api_key: ApiKey) ->
Result<ApiVersion, Error> {
+ match self.versions.get(&api_key) {
+ Some(Ok(version)) => Ok(*version),
+ Some(Err(msg)) => Err(Error::UnsupportedVersion {
+ message: msg.clone(),
+ }),
+ None => Err(Error::UnsupportedVersion {
+ message: format!("The server does not support {:?}", api_key),
+ }),
+ }
+ }
+}
+
+/// Resolve the API version to use for a given API key.
+///
+/// The `ApiVersions` request itself always uses `ApiVersion(0)` since it is
+/// sent before the version negotiation handshake.
+/// For all other requests, the handshake must have been completed; otherwise
+/// an error is returned to prevent silent fallback to an incorrect version.
+fn resolve_api_version_for(
+ api_versions: Option<&ServerApiVersions>,
+ api_key: ApiKey,
+) -> Result<ApiVersion, Error> {
+ if api_key == ApiKey::ApiVersion {
+ return Ok(ApiVersion(0));
Review Comment:
I think Java returns apiKey.highestSupportedVersion, it will desync
eventually
##########
crates/fluss/src/proto/fluss_api.proto:
##########
@@ -24,6 +24,23 @@ message ErrorResponse {
optional string error_message = 2;
}
+// api versions request and response
+message ApiVersionsRequest {
+ required string client_software_name = 1;
+ required string client_software_version = 2;
+}
+
+message ApiVersionsResponse {
+ repeated PbApiVersion api_versions = 1;
+ optional int32 server_type = 2;
Review Comment:
`server_type` is added but never read. Java validates it and disconnects
on coordinator/tablet mismatch. Let's file a follow-up, just so we don't
forget about it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]