This is an automated email from the ASF dual-hosted git repository.
hubcio pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new a86be3a61 fix(server): warn and skip non-numeric filenames in consumer
offset directories instead of panicking (#3135)
a86be3a61 is described below
commit a86be3a6150f91965da4a31272bd7aa17cdcb611
Author: Atharva Lade <[email protected]>
AuthorDate: Mon May 18 22:01:31 2026 -0500
fix(server): warn and skip non-numeric filenames in consumer offset
directories instead of panicking (#3135)
---
core/integration/tests/mod.rs | 1 +
core/integration/tests/storage/consumer_offsets.rs | 180 +++++++++++++++++++++
core/integration/tests/storage/mod.rs | 19 +++
core/server/src/streaming/partitions/storage.rs | 148 +++++++++++------
4 files changed, 301 insertions(+), 47 deletions(-)
diff --git a/core/integration/tests/mod.rs b/core/integration/tests/mod.rs
index 7c543a333..c46be600b 100644
--- a/core/integration/tests/mod.rs
+++ b/core/integration/tests/mod.rs
@@ -40,6 +40,7 @@ mod mcp;
mod sdk;
mod server;
mod state;
+mod storage;
lazy_static! {
static ref TESTS_FAILED: AtomicBool = AtomicBool::new(false);
diff --git a/core/integration/tests/storage/consumer_offsets.rs
b/core/integration/tests/storage/consumer_offsets.rs
new file mode 100644
index 000000000..0fbc628da
--- /dev/null
+++ b/core/integration/tests/storage/consumer_offsets.rs
@@ -0,0 +1,180 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use iggy_common::{ConsumerKind, IggyError};
+use server::streaming::partitions::storage::{load_consumer_group_offsets,
load_consumer_offsets};
+use std::path::Path;
+use std::sync::atomic::Ordering;
+
+fn write_offset_file(dir: &Path, name: &str, offset: u64) {
+ std::fs::write(dir.join(name), offset.to_le_bytes()).unwrap();
+}
+
+#[test]
+fn load_consumer_offsets_valid_files() {
+ let dir = tempfile::tempdir().unwrap();
+ write_offset_file(dir.path(), "1", 100);
+ write_offset_file(dir.path(), "2", 200);
+ write_offset_file(dir.path(), "3", 300);
+
+ let offsets = load_consumer_offsets(dir.path().to_str().unwrap()).unwrap();
+
+ assert_eq!(offsets.len(), 3);
+ assert_eq!(offsets[0].consumer_id, 1);
+ assert_eq!(offsets[0].offset.load(Ordering::Relaxed), 100);
+ assert_eq!(offsets[0].kind, ConsumerKind::Consumer);
+ assert_eq!(offsets[1].consumer_id, 2);
+ assert_eq!(offsets[1].offset.load(Ordering::Relaxed), 200);
+ assert_eq!(offsets[2].consumer_id, 3);
+ assert_eq!(offsets[2].offset.load(Ordering::Relaxed), 300);
+}
+
+#[test]
+fn load_consumer_offsets_skips_non_numeric_files() {
+ let dir = tempfile::tempdir().unwrap();
+ write_offset_file(dir.path(), ".DS_Store", 0);
+ write_offset_file(dir.path(), "backup.bak", 0);
+ write_offset_file(dir.path(), "1", 42);
+
+ let offsets = load_consumer_offsets(dir.path().to_str().unwrap()).unwrap();
+
+ assert_eq!(offsets.len(), 1);
+ assert_eq!(offsets[0].consumer_id, 1);
+ assert_eq!(offsets[0].offset.load(Ordering::Relaxed), 42);
+}
+
+#[test]
+fn load_consumer_offsets_skips_truncated_files() {
+ let dir = tempfile::tempdir().unwrap();
+ std::fs::write(dir.path().join("1"), [0u8; 3]).unwrap();
+ std::fs::write(dir.path().join("2"), []).unwrap();
+ write_offset_file(dir.path(), "3", 500);
+
+ let offsets = load_consumer_offsets(dir.path().to_str().unwrap()).unwrap();
+
+ assert_eq!(offsets.len(), 1);
+ assert_eq!(offsets[0].consumer_id, 3);
+ assert_eq!(offsets[0].offset.load(Ordering::Relaxed), 500);
+}
+
+#[test]
+fn load_consumer_offsets_empty_dir() {
+ let dir = tempfile::tempdir().unwrap();
+
+ let offsets = load_consumer_offsets(dir.path().to_str().unwrap()).unwrap();
+
+ assert!(offsets.is_empty());
+}
+
+#[test]
+fn load_consumer_offsets_skips_directories() {
+ let dir = tempfile::tempdir().unwrap();
+ std::fs::create_dir(dir.path().join("123")).unwrap();
+ write_offset_file(dir.path(), "1", 77);
+
+ let offsets = load_consumer_offsets(dir.path().to_str().unwrap()).unwrap();
+
+ assert_eq!(offsets.len(), 1);
+ assert_eq!(offsets[0].consumer_id, 1);
+ assert_eq!(offsets[0].offset.load(Ordering::Relaxed), 77);
+}
+
+#[test]
+fn load_consumer_offsets_nonexistent_dir() {
+ let dir = tempfile::tempdir().unwrap();
+ let path = dir.path().to_str().unwrap().to_string();
+ drop(dir);
+
+ let result = load_consumer_offsets(&path);
+
+ assert!(result.is_err());
+ assert!(matches!(
+ result.unwrap_err(),
+ IggyError::CannotReadConsumerOffsets(_)
+ ));
+}
+
+#[test]
+fn load_consumer_group_offsets_valid_files() {
+ let dir = tempfile::tempdir().unwrap();
+ write_offset_file(dir.path(), "1", 500);
+ write_offset_file(dir.path(), "2", 600);
+
+ let offsets =
load_consumer_group_offsets(dir.path().to_str().unwrap()).unwrap();
+
+ assert_eq!(offsets.len(), 2);
+ for (group_id, offset) in &offsets {
+ assert_eq!(offset.kind, ConsumerKind::ConsumerGroup);
+ assert_eq!(offset.consumer_id, group_id.0 as u32);
+ }
+ let ids: Vec<u32> = offsets.iter().map(|(_, co)| co.consumer_id).collect();
+ assert!(ids.contains(&1));
+ assert!(ids.contains(&2));
+}
+
+#[test]
+fn load_consumer_group_offsets_skips_non_numeric_files() {
+ let dir = tempfile::tempdir().unwrap();
+ write_offset_file(dir.path(), ".DS_Store", 0);
+ write_offset_file(dir.path(), "notes.txt", 0);
+ write_offset_file(dir.path(), "5", 999);
+
+ let offsets =
load_consumer_group_offsets(dir.path().to_str().unwrap()).unwrap();
+
+ assert_eq!(offsets.len(), 1);
+ assert_eq!(offsets[0].0.0, 5);
+ assert_eq!(offsets[0].1.consumer_id, 5);
+ assert_eq!(offsets[0].1.offset.load(Ordering::Relaxed), 999);
+}
+
+#[test]
+fn load_consumer_group_offsets_skips_truncated_files() {
+ let dir = tempfile::tempdir().unwrap();
+ std::fs::write(dir.path().join("1"), [0u8; 4]).unwrap();
+ write_offset_file(dir.path(), "2", 750);
+
+ let offsets =
load_consumer_group_offsets(dir.path().to_str().unwrap()).unwrap();
+
+ assert_eq!(offsets.len(), 1);
+ assert_eq!(offsets[0].0.0, 2);
+ assert_eq!(offsets[0].1.offset.load(Ordering::Relaxed), 750);
+}
+
+#[test]
+fn load_consumer_group_offsets_empty_dir() {
+ let dir = tempfile::tempdir().unwrap();
+
+ let offsets =
load_consumer_group_offsets(dir.path().to_str().unwrap()).unwrap();
+
+ assert!(offsets.is_empty());
+}
+
+#[test]
+fn load_consumer_group_offsets_nonexistent_dir() {
+ let dir = tempfile::tempdir().unwrap();
+ let path = dir.path().to_str().unwrap().to_string();
+ drop(dir);
+
+ let result = load_consumer_group_offsets(&path);
+
+ assert!(result.is_err());
+ assert!(matches!(
+ result.unwrap_err(),
+ IggyError::CannotReadConsumerOffsets(_)
+ ));
+}
diff --git a/core/integration/tests/storage/mod.rs
b/core/integration/tests/storage/mod.rs
new file mode 100644
index 000000000..8d4bac8b0
--- /dev/null
+++ b/core/integration/tests/storage/mod.rs
@@ -0,0 +1,19 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+mod consumer_offsets;
diff --git a/core/server/src/streaming/partitions/storage.rs
b/core/server/src/streaming/partitions/storage.rs
index 13f4011f3..e641b3ec3 100644
--- a/core/server/src/streaming/partitions/storage.rs
+++ b/core/server/src/streaming/partitions/storage.rs
@@ -25,10 +25,9 @@ use compio::{
fs::{self, OpenOptions, create_dir_all},
io::AsyncWriteAtExt,
};
-use err_trail::ErrContext;
use iggy_common::{ConsumerKind, IggyError};
use std::{io::Read, path::Path, sync::atomic::AtomicU64};
-use tracing::{error, trace};
+use tracing::{error, trace, warn};
pub async fn create_partition_file_hierarchy(
stream_id: usize,
@@ -170,20 +169,43 @@ pub fn load_consumer_offsets(path: &str) ->
Result<Vec<ConsumerOffset>, IggyErro
let mut consumer_offsets = Vec::new();
let dir_entries = dir_entries.unwrap();
for dir_entry in dir_entries {
- let dir_entry = dir_entry.unwrap();
- let metadata = dir_entry.metadata();
- if metadata.is_err() {
- break;
- }
+ let dir_entry = match dir_entry {
+ Ok(entry) => entry,
+ Err(e) => {
+ warn!(
+ "Failed to read directory entry in consumer offsets path:
{path}, \
+ error: {e}, skipping."
+ );
+ continue;
+ }
+ };
+
+ let metadata = match dir_entry.metadata() {
+ Ok(m) => m,
+ Err(e) => {
+ warn!(
+ "Failed to read metadata for entry in consumer offsets
path: {path}, \
+ error: {e}, skipping."
+ );
+ continue;
+ }
+ };
- if metadata.unwrap().is_dir() {
+ if metadata.is_dir() {
continue;
}
- let name = dir_entry.file_name().into_string().unwrap();
- let consumer_id = name.parse::<u32>().unwrap_or_else(|_| {
- panic!("Invalid consumer ID file with name: '{}'.", name);
- });
+ let name = dir_entry.file_name().to_string_lossy().to_string();
+ let consumer_id = match name.parse::<u32>() {
+ Ok(id) => id,
+ Err(_) => {
+ warn!(
+ "Unexpected non-numeric consumer offset file: '{}',
skipping.",
+ name
+ );
+ continue;
+ }
+ };
let path = dir_entry.path();
let path = path.to_str();
@@ -193,19 +215,25 @@ pub fn load_consumer_offsets(path: &str) ->
Result<Vec<ConsumerOffset>, IggyErro
}
let path = path.unwrap().to_string();
- let file = std::fs::File::open(&path)
- .error(|e: &std::io::Error| {
- format!("{COMPONENT} (error: {e}) - failed to open offset
file, path: {path}")
- })
- .map_err(|_| IggyError::CannotReadFile)?;
+ let file = match std::fs::File::open(&path) {
+ Ok(f) => f,
+ Err(e) => {
+ warn!(
+ "{COMPONENT} (error: {e}) - failed to open offset file, \
+ path: {path}, skipping."
+ );
+ continue;
+ }
+ };
let mut cursor = std::io::Cursor::new(file);
let mut offset = [0; 8];
- cursor
- .get_mut().read_exact(&mut offset)
- .error(|e: &std::io::Error| {
- format!("{COMPONENT} (error: {e}) - failed to read consumer
offset from file, path: {path}")
- })
- .map_err(|_| IggyError::CannotReadFile)?;
+ if let Err(e) = cursor.get_mut().read_exact(&mut offset) {
+ warn!(
+ "{COMPONENT} (error: {e}) - failed to read consumer offset
from file \
+ (truncated or corrupt?), path: {path}, skipping."
+ );
+ continue;
+ }
let offset = AtomicU64::new(u64::from_le_bytes(offset));
consumer_offsets.push(ConsumerOffset {
@@ -232,24 +260,44 @@ pub fn load_consumer_group_offsets(
let mut consumer_group_offsets = Vec::new();
let dir_entries = dir_entries.unwrap();
for dir_entry in dir_entries {
- let dir_entry = dir_entry.unwrap();
- let metadata = dir_entry.metadata();
- if metadata.is_err() {
- break;
- }
+ let dir_entry = match dir_entry {
+ Ok(entry) => entry,
+ Err(e) => {
+ warn!(
+ "Failed to read directory entry in consumer group offsets
path: {path}, \
+ error: {e}, skipping."
+ );
+ continue;
+ }
+ };
+
+ let metadata = match dir_entry.metadata() {
+ Ok(m) => m,
+ Err(e) => {
+ warn!(
+ "Failed to read metadata for entry in consumer group
offsets path: {path}, \
+ error: {e}, skipping."
+ );
+ continue;
+ }
+ };
- if metadata.unwrap().is_dir() {
+ if metadata.is_dir() {
continue;
}
- let name = dir_entry.file_name().into_string().unwrap();
+ let name = dir_entry.file_name().to_string_lossy().to_string();
- let consumer_group_id = name.parse::<u32>().unwrap_or_else(|_| {
- panic!(
- "Invalid consumer group ID in consumer group file with name:
'{}'.",
- name
- );
- });
+ let consumer_group_id = match name.parse::<u32>() {
+ Ok(id) => id,
+ Err(_) => {
+ warn!(
+ "Unexpected non-numeric consumer group offset file: '{}',
skipping.",
+ name
+ );
+ continue;
+ }
+ };
let consumer_group_id = ConsumerGroupId(consumer_group_id as usize);
let path = dir_entry.path();
@@ -263,19 +311,25 @@ pub fn load_consumer_group_offsets(
}
let path = path.unwrap().to_string();
- let file = std::fs::File::open(&path)
- .error(|e: &std::io::Error| {
- format!("{COMPONENT} (error: {e}) - failed to open offset
file, path: {path}")
- })
- .map_err(|_| IggyError::CannotReadFile)?;
+ let file = match std::fs::File::open(&path) {
+ Ok(f) => f,
+ Err(e) => {
+ warn!(
+ "{COMPONENT} (error: {e}) - failed to open offset file, \
+ path: {path}, skipping."
+ );
+ continue;
+ }
+ };
let mut cursor = std::io::Cursor::new(file);
let mut offset = [0; 8];
- cursor
- .get_mut().read_exact(&mut offset)
- .error(|e: &std::io::Error| {
- format!("{COMPONENT} (error: {e}) - failed to read consumer
group offset from file, path: {path}")
- })
- .map_err(|_| IggyError::CannotReadFile)?;
+ if let Err(e) = cursor.get_mut().read_exact(&mut offset) {
+ warn!(
+ "{COMPONENT} (error: {e}) - failed to read consumer group
offset from file \
+ (truncated or corrupt?), path: {path}, skipping."
+ );
+ continue;
+ }
let offset = AtomicU64::new(u64::from_le_bytes(offset));
let consumer_offset = ConsumerOffset {