atharvalade commented on code in PR #2967:
URL: https://github.com/apache/iggy/pull/2967#discussion_r2958546016


##########
core/binary_protocol/src/responses/system/get_cluster_metadata.rs:
##########
@@ -0,0 +1,230 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::WireError;
+use crate::codec::{WireDecode, WireEncode, read_str, read_u8, read_u16_le, 
read_u32_le};
+use bytes::{BufMut, BytesMut};
+
+/// `GetClusterMetadata` response: cluster name and list of nodes.
+///
+/// Wire format:
+/// ```text
+/// [name_len:4 LE][name:N][nodes_count:4 LE][ClusterNodeResponse]*
+/// ```
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct ClusterMetadataResponse {
+    pub name: String,
+    pub nodes: Vec<ClusterNodeResponse>,
+}
+
+/// A single node within the cluster metadata.
+///
+/// Wire format:
+/// ```text
+/// [name_len:4 LE][name:N][ip_len:4 LE][ip:N]
+/// [tcp:2 LE][quic:2 LE][http:2 LE][websocket:2 LE]
+/// [role:1][status:1]
+/// ```
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct ClusterNodeResponse {
+    pub name: String,
+    pub ip: String,
+    pub tcp_port: u16,
+    pub quic_port: u16,
+    pub http_port: u16,
+    pub websocket_port: u16,
+    pub role: u8,
+    pub status: u8,
+}
+
+const NODE_FIXED_SIZE: usize = 4 + 4 + 8 + 1 + 1; // name_len + ip_len + ports 
+ role + status
+
+impl WireEncode for ClusterNodeResponse {
+    fn encoded_size(&self) -> usize {
+        NODE_FIXED_SIZE + self.name.len() + self.ip.len()
+    }
+
+    #[allow(clippy::cast_possible_truncation)]
+    fn encode(&self, buf: &mut BytesMut) {
+        buf.put_u32_le(self.name.len() as u32);
+        buf.put_slice(self.name.as_bytes());
+        buf.put_u32_le(self.ip.len() as u32);
+        buf.put_slice(self.ip.as_bytes());
+        buf.put_u16_le(self.tcp_port);
+        buf.put_u16_le(self.quic_port);
+        buf.put_u16_le(self.http_port);
+        buf.put_u16_le(self.websocket_port);
+        buf.put_u8(self.role);
+        buf.put_u8(self.status);
+    }
+}
+
+impl WireDecode for ClusterNodeResponse {
+    fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
+        let name_len = read_u32_le(buf, 0)? as usize;
+        let name = read_str(buf, 4, name_len)?;
+        let pos = 4 + name_len;
+
+        let ip_len = read_u32_le(buf, pos)? as usize;
+        let ip = read_str(buf, pos + 4, ip_len)?;
+        let pos = pos + 4 + ip_len;
+
+        let tcp_port = read_u16_le(buf, pos)?;
+        let quic_port = read_u16_le(buf, pos + 2)?;
+        let http_port = read_u16_le(buf, pos + 4)?;
+        let websocket_port = read_u16_le(buf, pos + 6)?;
+        let role = read_u8(buf, pos + 8)?;
+        let status = read_u8(buf, pos + 9)?;
+
+        Ok((
+            Self {
+                name,
+                ip,
+                tcp_port,
+                quic_port,
+                http_port,
+                websocket_port,
+                role,
+                status,
+            },
+            pos + 10,
+        ))
+    }
+}
+
+impl WireEncode for ClusterMetadataResponse {
+    fn encoded_size(&self) -> usize {
+        4 + self.name.len()
+            + 4
+            + self
+                .nodes
+                .iter()
+                .map(WireEncode::encoded_size)
+                .sum::<usize>()
+    }
+
+    #[allow(clippy::cast_possible_truncation)]
+    fn encode(&self, buf: &mut BytesMut) {
+        buf.put_u32_le(self.name.len() as u32);
+        buf.put_slice(self.name.as_bytes());
+        buf.put_u32_le(self.nodes.len() as u32);
+        for node in &self.nodes {
+            node.encode(buf);
+        }
+    }
+}
+
+impl WireDecode for ClusterMetadataResponse {
+    fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
+        let name_len = read_u32_le(buf, 0)? as usize;
+        let name = read_str(buf, 4, name_len)?;
+        let mut pos = 4 + name_len;
+
+        let nodes_count = read_u32_le(buf, pos)? as usize;
+        pos += 4;
+
+        let mut nodes = Vec::with_capacity(nodes_count);

Review Comment:
   `Vec::with_capacity(nodes_count)` allocates eagerly before any node is 
decoded, so a crafted nodes_count of `u32::MAX` tries to reserve ~200 GB and 
OOMs. I think it'll be a good idea to cap the pre-allocation to something like 
`nodes_count.min(remaining_buf_len / 18)` where 18 is the minimum encoded node 
size.



##########
core/binary_protocol/src/consensus/operation.rs:
##########
@@ -95,64 +95,27 @@ impl Operation {
     }
 
     /// Bidirectional mapping: `Operation` -> client command code.
+    ///
+    /// Delegates to the dispatch table as the single source of truth.
     #[must_use]
     pub const fn to_command_code(&self) -> Option<u32> {
-        use crate::codes;
         match self {
             Self::Reserved => None,
-            Self::CreateStream => Some(codes::CREATE_STREAM_CODE),
-            Self::UpdateStream => Some(codes::UPDATE_STREAM_CODE),
-            Self::DeleteStream => Some(codes::DELETE_STREAM_CODE),
-            Self::PurgeStream => Some(codes::PURGE_STREAM_CODE),
-            Self::CreateTopic => Some(codes::CREATE_TOPIC_CODE),
-            Self::UpdateTopic => Some(codes::UPDATE_TOPIC_CODE),
-            Self::DeleteTopic => Some(codes::DELETE_TOPIC_CODE),
-            Self::PurgeTopic => Some(codes::PURGE_TOPIC_CODE),
-            Self::CreatePartitions => Some(codes::CREATE_PARTITIONS_CODE),
-            Self::DeletePartitions => Some(codes::DELETE_PARTITIONS_CODE),
-            Self::DeleteSegments => Some(codes::DELETE_SEGMENTS_CODE),
-            Self::CreateConsumerGroup => 
Some(codes::CREATE_CONSUMER_GROUP_CODE),
-            Self::DeleteConsumerGroup => 
Some(codes::DELETE_CONSUMER_GROUP_CODE),
-            Self::CreateUser => Some(codes::CREATE_USER_CODE),
-            Self::UpdateUser => Some(codes::UPDATE_USER_CODE),
-            Self::DeleteUser => Some(codes::DELETE_USER_CODE),
-            Self::ChangePassword => Some(codes::CHANGE_PASSWORD_CODE),
-            Self::UpdatePermissions => Some(codes::UPDATE_PERMISSIONS_CODE),
-            Self::CreatePersonalAccessToken => 
Some(codes::CREATE_PERSONAL_ACCESS_TOKEN_CODE),
-            Self::DeletePersonalAccessToken => 
Some(codes::DELETE_PERSONAL_ACCESS_TOKEN_CODE),
-            Self::SendMessages => Some(codes::SEND_MESSAGES_CODE),
-            Self::StoreConsumerOffset => 
Some(codes::STORE_CONSUMER_OFFSET_CODE),
+            _ => match crate::dispatch::lookup_by_operation(*self) {

Review Comment:
   The `_` wildcard here means that if someone adds a new Operation variant but 
forgets to add it to COMMAND_TABLE, this silently returns None instead of 
giving a compiler error. The old exhaustive match caught that at compile time. 
how about listing all variants explicitly (delegating to the dispatch table is 
fine) so rustc keeps enforcing coverage?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to