This is an automated email from the ASF dual-hosted git repository.

hubcio pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new a86be3a61 fix(server): warn and skip non-numeric filenames in consumer 
offset directories instead of panicking (#3135)
a86be3a61 is described below

commit a86be3a6150f91965da4a31272bd7aa17cdcb611
Author: Atharva Lade <[email protected]>
AuthorDate: Mon May 18 22:01:31 2026 -0500

    fix(server): warn and skip non-numeric filenames in consumer offset 
directories instead of panicking (#3135)
---
 core/integration/tests/mod.rs                      |   1 +
 core/integration/tests/storage/consumer_offsets.rs | 180 +++++++++++++++++++++
 core/integration/tests/storage/mod.rs              |  19 +++
 core/server/src/streaming/partitions/storage.rs    | 148 +++++++++++------
 4 files changed, 301 insertions(+), 47 deletions(-)

diff --git a/core/integration/tests/mod.rs b/core/integration/tests/mod.rs
index 7c543a333..c46be600b 100644
--- a/core/integration/tests/mod.rs
+++ b/core/integration/tests/mod.rs
@@ -40,6 +40,7 @@ mod mcp;
 mod sdk;
 mod server;
 mod state;
+mod storage;
 
 lazy_static! {
     static ref TESTS_FAILED: AtomicBool = AtomicBool::new(false);
diff --git a/core/integration/tests/storage/consumer_offsets.rs 
b/core/integration/tests/storage/consumer_offsets.rs
new file mode 100644
index 000000000..0fbc628da
--- /dev/null
+++ b/core/integration/tests/storage/consumer_offsets.rs
@@ -0,0 +1,180 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use iggy_common::{ConsumerKind, IggyError};
+use server::streaming::partitions::storage::{load_consumer_group_offsets, 
load_consumer_offsets};
+use std::path::Path;
+use std::sync::atomic::Ordering;
+
+fn write_offset_file(dir: &Path, name: &str, offset: u64) {
+    std::fs::write(dir.join(name), offset.to_le_bytes()).unwrap();
+}
+
+#[test]
+fn load_consumer_offsets_valid_files() {
+    let dir = tempfile::tempdir().unwrap();
+    write_offset_file(dir.path(), "1", 100);
+    write_offset_file(dir.path(), "2", 200);
+    write_offset_file(dir.path(), "3", 300);
+
+    let offsets = load_consumer_offsets(dir.path().to_str().unwrap()).unwrap();
+
+    assert_eq!(offsets.len(), 3);
+    assert_eq!(offsets[0].consumer_id, 1);
+    assert_eq!(offsets[0].offset.load(Ordering::Relaxed), 100);
+    assert_eq!(offsets[0].kind, ConsumerKind::Consumer);
+    assert_eq!(offsets[1].consumer_id, 2);
+    assert_eq!(offsets[1].offset.load(Ordering::Relaxed), 200);
+    assert_eq!(offsets[2].consumer_id, 3);
+    assert_eq!(offsets[2].offset.load(Ordering::Relaxed), 300);
+}
+
+#[test]
+fn load_consumer_offsets_skips_non_numeric_files() {
+    let dir = tempfile::tempdir().unwrap();
+    write_offset_file(dir.path(), ".DS_Store", 0);
+    write_offset_file(dir.path(), "backup.bak", 0);
+    write_offset_file(dir.path(), "1", 42);
+
+    let offsets = load_consumer_offsets(dir.path().to_str().unwrap()).unwrap();
+
+    assert_eq!(offsets.len(), 1);
+    assert_eq!(offsets[0].consumer_id, 1);
+    assert_eq!(offsets[0].offset.load(Ordering::Relaxed), 42);
+}
+
+#[test]
+fn load_consumer_offsets_skips_truncated_files() {
+    let dir = tempfile::tempdir().unwrap();
+    std::fs::write(dir.path().join("1"), [0u8; 3]).unwrap();
+    std::fs::write(dir.path().join("2"), []).unwrap();
+    write_offset_file(dir.path(), "3", 500);
+
+    let offsets = load_consumer_offsets(dir.path().to_str().unwrap()).unwrap();
+
+    assert_eq!(offsets.len(), 1);
+    assert_eq!(offsets[0].consumer_id, 3);
+    assert_eq!(offsets[0].offset.load(Ordering::Relaxed), 500);
+}
+
+#[test]
+fn load_consumer_offsets_empty_dir() {
+    let dir = tempfile::tempdir().unwrap();
+
+    let offsets = load_consumer_offsets(dir.path().to_str().unwrap()).unwrap();
+
+    assert!(offsets.is_empty());
+}
+
+#[test]
+fn load_consumer_offsets_skips_directories() {
+    let dir = tempfile::tempdir().unwrap();
+    std::fs::create_dir(dir.path().join("123")).unwrap();
+    write_offset_file(dir.path(), "1", 77);
+
+    let offsets = load_consumer_offsets(dir.path().to_str().unwrap()).unwrap();
+
+    assert_eq!(offsets.len(), 1);
+    assert_eq!(offsets[0].consumer_id, 1);
+    assert_eq!(offsets[0].offset.load(Ordering::Relaxed), 77);
+}
+
+#[test]
+fn load_consumer_offsets_nonexistent_dir() {
+    let dir = tempfile::tempdir().unwrap();
+    let path = dir.path().to_str().unwrap().to_string();
+    drop(dir);
+
+    let result = load_consumer_offsets(&path);
+
+    assert!(result.is_err());
+    assert!(matches!(
+        result.unwrap_err(),
+        IggyError::CannotReadConsumerOffsets(_)
+    ));
+}
+
+#[test]
+fn load_consumer_group_offsets_valid_files() {
+    let dir = tempfile::tempdir().unwrap();
+    write_offset_file(dir.path(), "1", 500);
+    write_offset_file(dir.path(), "2", 600);
+
+    let offsets = 
load_consumer_group_offsets(dir.path().to_str().unwrap()).unwrap();
+
+    assert_eq!(offsets.len(), 2);
+    for (group_id, offset) in &offsets {
+        assert_eq!(offset.kind, ConsumerKind::ConsumerGroup);
+        assert_eq!(offset.consumer_id, group_id.0 as u32);
+    }
+    let ids: Vec<u32> = offsets.iter().map(|(_, co)| co.consumer_id).collect();
+    assert!(ids.contains(&1));
+    assert!(ids.contains(&2));
+}
+
+#[test]
+fn load_consumer_group_offsets_skips_non_numeric_files() {
+    let dir = tempfile::tempdir().unwrap();
+    write_offset_file(dir.path(), ".DS_Store", 0);
+    write_offset_file(dir.path(), "notes.txt", 0);
+    write_offset_file(dir.path(), "5", 999);
+
+    let offsets = 
load_consumer_group_offsets(dir.path().to_str().unwrap()).unwrap();
+
+    assert_eq!(offsets.len(), 1);
+    assert_eq!(offsets[0].0.0, 5);
+    assert_eq!(offsets[0].1.consumer_id, 5);
+    assert_eq!(offsets[0].1.offset.load(Ordering::Relaxed), 999);
+}
+
+#[test]
+fn load_consumer_group_offsets_skips_truncated_files() {
+    let dir = tempfile::tempdir().unwrap();
+    std::fs::write(dir.path().join("1"), [0u8; 4]).unwrap();
+    write_offset_file(dir.path(), "2", 750);
+
+    let offsets = 
load_consumer_group_offsets(dir.path().to_str().unwrap()).unwrap();
+
+    assert_eq!(offsets.len(), 1);
+    assert_eq!(offsets[0].0.0, 2);
+    assert_eq!(offsets[0].1.offset.load(Ordering::Relaxed), 750);
+}
+
+#[test]
+fn load_consumer_group_offsets_empty_dir() {
+    let dir = tempfile::tempdir().unwrap();
+
+    let offsets = 
load_consumer_group_offsets(dir.path().to_str().unwrap()).unwrap();
+
+    assert!(offsets.is_empty());
+}
+
+#[test]
+fn load_consumer_group_offsets_nonexistent_dir() {
+    let dir = tempfile::tempdir().unwrap();
+    let path = dir.path().to_str().unwrap().to_string();
+    drop(dir);
+
+    let result = load_consumer_group_offsets(&path);
+
+    assert!(result.is_err());
+    assert!(matches!(
+        result.unwrap_err(),
+        IggyError::CannotReadConsumerOffsets(_)
+    ));
+}
diff --git a/core/integration/tests/storage/mod.rs 
b/core/integration/tests/storage/mod.rs
new file mode 100644
index 000000000..8d4bac8b0
--- /dev/null
+++ b/core/integration/tests/storage/mod.rs
@@ -0,0 +1,19 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+mod consumer_offsets;
diff --git a/core/server/src/streaming/partitions/storage.rs 
b/core/server/src/streaming/partitions/storage.rs
index 13f4011f3..e641b3ec3 100644
--- a/core/server/src/streaming/partitions/storage.rs
+++ b/core/server/src/streaming/partitions/storage.rs
@@ -25,10 +25,9 @@ use compio::{
     fs::{self, OpenOptions, create_dir_all},
     io::AsyncWriteAtExt,
 };
-use err_trail::ErrContext;
 use iggy_common::{ConsumerKind, IggyError};
 use std::{io::Read, path::Path, sync::atomic::AtomicU64};
-use tracing::{error, trace};
+use tracing::{error, trace, warn};
 
 pub async fn create_partition_file_hierarchy(
     stream_id: usize,
@@ -170,20 +169,43 @@ pub fn load_consumer_offsets(path: &str) -> 
Result<Vec<ConsumerOffset>, IggyErro
     let mut consumer_offsets = Vec::new();
     let dir_entries = dir_entries.unwrap();
     for dir_entry in dir_entries {
-        let dir_entry = dir_entry.unwrap();
-        let metadata = dir_entry.metadata();
-        if metadata.is_err() {
-            break;
-        }
+        let dir_entry = match dir_entry {
+            Ok(entry) => entry,
+            Err(e) => {
+                warn!(
+                    "Failed to read directory entry in consumer offsets path: 
{path}, \
+                     error: {e}, skipping."
+                );
+                continue;
+            }
+        };
+
+        let metadata = match dir_entry.metadata() {
+            Ok(m) => m,
+            Err(e) => {
+                warn!(
+                    "Failed to read metadata for entry in consumer offsets 
path: {path}, \
+                     error: {e}, skipping."
+                );
+                continue;
+            }
+        };
 
-        if metadata.unwrap().is_dir() {
+        if metadata.is_dir() {
             continue;
         }
 
-        let name = dir_entry.file_name().into_string().unwrap();
-        let consumer_id = name.parse::<u32>().unwrap_or_else(|_| {
-            panic!("Invalid consumer ID file with name: '{}'.", name);
-        });
+        let name = dir_entry.file_name().to_string_lossy().to_string();
+        let consumer_id = match name.parse::<u32>() {
+            Ok(id) => id,
+            Err(_) => {
+                warn!(
+                    "Unexpected non-numeric consumer offset file: '{}', 
skipping.",
+                    name
+                );
+                continue;
+            }
+        };
 
         let path = dir_entry.path();
         let path = path.to_str();
@@ -193,19 +215,25 @@ pub fn load_consumer_offsets(path: &str) -> 
Result<Vec<ConsumerOffset>, IggyErro
         }
 
         let path = path.unwrap().to_string();
-        let file = std::fs::File::open(&path)
-            .error(|e: &std::io::Error| {
-                format!("{COMPONENT} (error: {e}) - failed to open offset 
file, path: {path}")
-            })
-            .map_err(|_| IggyError::CannotReadFile)?;
+        let file = match std::fs::File::open(&path) {
+            Ok(f) => f,
+            Err(e) => {
+                warn!(
+                    "{COMPONENT} (error: {e}) - failed to open offset file, \
+                     path: {path}, skipping."
+                );
+                continue;
+            }
+        };
         let mut cursor = std::io::Cursor::new(file);
         let mut offset = [0; 8];
-        cursor
-            .get_mut().read_exact(&mut offset)
-            .error(|e: &std::io::Error| {
-                format!("{COMPONENT} (error: {e}) - failed to read consumer 
offset from file, path: {path}")
-            })
-            .map_err(|_| IggyError::CannotReadFile)?;
+        if let Err(e) = cursor.get_mut().read_exact(&mut offset) {
+            warn!(
+                "{COMPONENT} (error: {e}) - failed to read consumer offset 
from file \
+                 (truncated or corrupt?), path: {path}, skipping."
+            );
+            continue;
+        }
         let offset = AtomicU64::new(u64::from_le_bytes(offset));
 
         consumer_offsets.push(ConsumerOffset {
@@ -232,24 +260,44 @@ pub fn load_consumer_group_offsets(
     let mut consumer_group_offsets = Vec::new();
     let dir_entries = dir_entries.unwrap();
     for dir_entry in dir_entries {
-        let dir_entry = dir_entry.unwrap();
-        let metadata = dir_entry.metadata();
-        if metadata.is_err() {
-            break;
-        }
+        let dir_entry = match dir_entry {
+            Ok(entry) => entry,
+            Err(e) => {
+                warn!(
+                    "Failed to read directory entry in consumer group offsets 
path: {path}, \
+                     error: {e}, skipping."
+                );
+                continue;
+            }
+        };
+
+        let metadata = match dir_entry.metadata() {
+            Ok(m) => m,
+            Err(e) => {
+                warn!(
+                    "Failed to read metadata for entry in consumer group 
offsets path: {path}, \
+                     error: {e}, skipping."
+                );
+                continue;
+            }
+        };
 
-        if metadata.unwrap().is_dir() {
+        if metadata.is_dir() {
             continue;
         }
 
-        let name = dir_entry.file_name().into_string().unwrap();
+        let name = dir_entry.file_name().to_string_lossy().to_string();
 
-        let consumer_group_id = name.parse::<u32>().unwrap_or_else(|_| {
-            panic!(
-                "Invalid consumer group ID in consumer group file with name: 
'{}'.",
-                name
-            );
-        });
+        let consumer_group_id = match name.parse::<u32>() {
+            Ok(id) => id,
+            Err(_) => {
+                warn!(
+                    "Unexpected non-numeric consumer group offset file: '{}', 
skipping.",
+                    name
+                );
+                continue;
+            }
+        };
         let consumer_group_id = ConsumerGroupId(consumer_group_id as usize);
 
         let path = dir_entry.path();
@@ -263,19 +311,25 @@ pub fn load_consumer_group_offsets(
         }
 
         let path = path.unwrap().to_string();
-        let file = std::fs::File::open(&path)
-            .error(|e: &std::io::Error| {
-                format!("{COMPONENT} (error: {e}) - failed to open offset 
file, path: {path}")
-            })
-            .map_err(|_| IggyError::CannotReadFile)?;
+        let file = match std::fs::File::open(&path) {
+            Ok(f) => f,
+            Err(e) => {
+                warn!(
+                    "{COMPONENT} (error: {e}) - failed to open offset file, \
+                     path: {path}, skipping."
+                );
+                continue;
+            }
+        };
         let mut cursor = std::io::Cursor::new(file);
         let mut offset = [0; 8];
-        cursor
-            .get_mut().read_exact(&mut offset)
-            .error(|e: &std::io::Error| {
-                format!("{COMPONENT} (error: {e}) - failed to read consumer 
group offset from file, path: {path}")
-            })
-            .map_err(|_| IggyError::CannotReadFile)?;
+        if let Err(e) = cursor.get_mut().read_exact(&mut offset) {
+            warn!(
+                "{COMPONENT} (error: {e}) - failed to read consumer group 
offset from file \
+                 (truncated or corrupt?), path: {path}, skipping."
+            );
+            continue;
+        }
         let offset = AtomicU64::new(u64::from_le_bytes(offset));
 
         let consumer_offset = ConsumerOffset {

Reply via email to