This is an automated email from the ASF dual-hosted git repository.
maciej pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 9541332d1 fix(server): prevent storing consumer offsets beyond max
partition range (#2794)
9541332d1 is described below
commit 9541332d18bcf83fc0390f8ffc83a645f44ea3d4
Author: omnitrix <[email protected]>
AuthorDate: Tue Feb 24 17:46:07 2026 +0800
fix(server): prevent storing consumer offsets beyond max partition range
(#2794)
closes #2730
---
core/integration/tests/server/general.rs | 18 +-
.../scenarios/invalid_consumer_offset_scenario.rs | 236 +++++++++++++++++++++
core/integration/tests/server/scenarios/mod.rs | 1 +
core/server/src/shard/system/consumer_offsets.rs | 11 +-
core/server/src/shard/system/utils.rs | 27 +++
.../blocking/ConsumerOffsetsClientBaseTest.java | 7 +
6 files changed, 288 insertions(+), 12 deletions(-)
diff --git a/core/integration/tests/server/general.rs
b/core/integration/tests/server/general.rs
index 13b7c1dfa..f98ac885d 100644
--- a/core/integration/tests/server/general.rs
+++ b/core/integration/tests/server/general.rs
@@ -17,8 +17,9 @@
use crate::server::scenarios::{
authentication_scenario, bench_scenario,
consumer_timestamp_polling_scenario,
- create_message_payload, message_headers_scenario, permissions_scenario,
snapshot_scenario,
- stream_size_validation_scenario, system_scenario, user_scenario,
+ create_message_payload, invalid_consumer_offset_scenario,
message_headers_scenario,
+ permissions_scenario, snapshot_scenario, stream_size_validation_scenario,
system_scenario,
+ user_scenario,
};
use integration::iggy_harness;
@@ -151,3 +152,16 @@ async fn consumer_timestamp_polling(harness: &TestHarness)
{
async fn snapshot(harness: &TestHarness) {
snapshot_scenario::run(harness).await;
}
+
+#[iggy_harness(
+ test_client_transport = [Tcp, Http, Quic, WebSocket],
+ server(
+ tcp.socket.override_defaults = true,
+ tcp.socket.nodelay = true,
+ quic.max_idle_timeout = "500s",
+ quic.keep_alive_interval = "15s"
+ )
+)]
+async fn invalid_consumer_offset(harness: &TestHarness) {
+ invalid_consumer_offset_scenario::run(harness).await;
+}
diff --git
a/core/integration/tests/server/scenarios/invalid_consumer_offset_scenario.rs
b/core/integration/tests/server/scenarios/invalid_consumer_offset_scenario.rs
new file mode 100644
index 000000000..f612f6850
--- /dev/null
+++
b/core/integration/tests/server/scenarios/invalid_consumer_offset_scenario.rs
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use bytes::Bytes;
+use iggy::prelude::*;
+use iggy_common::IggyError;
+use integration::harness::{TestHarness, assert_clean_system};
+
+const STREAM_NAME: &str = "test-stream-offsets";
+const TOPIC_NAME: &str = "test-topic-offsets";
+const PARTITIONS_COUNT: u32 = 1;
+const PARTITION_ID: u32 = 0;
+const CONSUMER_NAME: &str = "test-consumer";
+const CONSUMER_GROUP_NAME: &str = "test-consumer-group-offsets";
+const MESSAGES_COUNT: u32 = 5;
+
+fn assert_invalid_offset_error(err: &IggyError, expected_offset: u64) {
+ let expected_code = IggyError::InvalidOffset(expected_offset).as_code();
+
+ // HTTP client currently doesn't deserialize the ErrorResponse body into
specific IggyError
+ // variants for 400 Bad Request. Until it does, we must manually check the
JSON body for the expected error ID.
+ let is_match = err.as_code() == expected_code
+ || matches!(
+ err,
+ IggyError::HttpResponseError(400, reason)
+ if reason.contains(&format!("\"id\":{}", expected_code))
+ );
+
+ assert!(
+ is_match,
+ "Expected error code {}, got {:?}",
+ expected_code, err
+ );
+}
+
+pub async fn run(harness: &TestHarness) {
+ let client = harness
+ .root_client()
+ .await
+ .expect("Failed to get root client");
+
+ let stream = Identifier::named(STREAM_NAME).unwrap();
+ let topic = Identifier::named(TOPIC_NAME).unwrap();
+
+ let joined_group = initialize(&client, &stream, &topic).await;
+
+ let consumer = Consumer {
+ kind: ConsumerKind::Consumer,
+ id: Identifier::named(CONSUMER_NAME).unwrap(),
+ };
+ let consumer_group = Consumer {
+ kind: ConsumerKind::ConsumerGroup,
+ id: Identifier::named(CONSUMER_GROUP_NAME).unwrap(),
+ };
+
+ // 1. Empty partition scenarios
+ test_offset_for_empty_partition(&client, &consumer, &stream, &topic).await;
+ if joined_group {
+ test_offset_for_empty_partition(&client, &consumer_group, &stream,
&topic).await;
+ }
+
+ // 2. Send messages to create non-empty partition scenarios
+ send_messages(&client, &stream, &topic).await;
+
+ // 3. Non-empty partition scenarios
+ test_offset_for_non_empty_partition(&client, &consumer, &stream,
&topic).await;
+ if joined_group {
+ test_offset_for_non_empty_partition(&client, &consumer_group, &stream,
&topic).await;
+ }
+
+ cleanup(&client, &stream, &topic, joined_group).await;
+}
+
+async fn initialize(client: &IggyClient, stream: &Identifier, topic:
&Identifier) -> bool {
+ client.create_stream(STREAM_NAME).await.unwrap();
+ client
+ .create_topic(
+ stream,
+ TOPIC_NAME,
+ PARTITIONS_COUNT,
+ Default::default(),
+ None,
+ IggyExpiry::NeverExpire,
+ MaxTopicSize::Unlimited,
+ )
+ .await
+ .unwrap();
+
+ client
+ .create_consumer_group(stream, topic, CONSUMER_GROUP_NAME)
+ .await
+ .unwrap();
+
+ let join_result = client
+ .join_consumer_group(
+ stream,
+ topic,
+ &Identifier::named(CONSUMER_GROUP_NAME).unwrap(),
+ )
+ .await;
+ match join_result {
+ Ok(_) => true,
+ Err(e) => {
+ assert_eq!(e.as_code(), IggyError::FeatureUnavailable.as_code());
+ false
+ }
+ }
+}
+
+async fn cleanup(client: &IggyClient, stream: &Identifier, topic: &Identifier,
joined_group: bool) {
+ if joined_group {
+ client
+ .leave_consumer_group(
+ stream,
+ topic,
+ &Identifier::named(CONSUMER_GROUP_NAME).unwrap(),
+ )
+ .await
+ .unwrap();
+ }
+
+ client
+ .delete_consumer_group(
+ stream,
+ topic,
+ &Identifier::named(CONSUMER_GROUP_NAME).unwrap(),
+ )
+ .await
+ .unwrap();
+
+ client.delete_stream(stream).await.unwrap();
+ assert_clean_system(client).await;
+}
+
+async fn test_offset_for_empty_partition(
+ client: &IggyClient,
+ consumer: &Consumer,
+ stream: &Identifier,
+ topic: &Identifier,
+) {
+ // Attempt to store offset 0 on a newly created, empty partition.
+ // This operation should fail to prevent prematurely advancing the offset.
+ let err = client
+ .store_consumer_offset(consumer, stream, topic, Some(PARTITION_ID), 0)
+ .await
+ .expect_err("Storing offset 0 on empty partition should fail");
+ assert_invalid_offset_error(&err, 0);
+
+ // Attempt to store current_offset + 1 (should fail with specific error)
+ let err = client
+ .store_consumer_offset(consumer, stream, topic, Some(PARTITION_ID), 1)
+ .await
+ .expect_err("Storing offset 1 on empty partition should fail");
+ assert_invalid_offset_error(&err, 1);
+}
+
+async fn test_offset_for_non_empty_partition(
+ client: &IggyClient,
+ consumer: &Consumer,
+ stream: &Identifier,
+ topic: &Identifier,
+) {
+ // Attempt to store offset less than current_offset (should succeed)
+ // For 5 messages, offsets are 0, 1, 2, 3, 4, so current_offset is 4.
+ client
+ .store_consumer_offset(consumer, stream, topic, Some(PARTITION_ID), 2)
+ .await
+ .unwrap();
+
+ let offset_info = client
+ .get_consumer_offset(consumer, stream, topic, Some(PARTITION_ID))
+ .await
+ .unwrap()
+ .expect("Failed to get offset");
+ assert_eq!(offset_info.stored_offset, 2);
+
+ // Attempt to store exactly current_offset (should succeed)
+ let max_offset = (MESSAGES_COUNT - 1) as u64;
+ client
+ .store_consumer_offset(consumer, stream, topic, Some(PARTITION_ID),
max_offset)
+ .await
+ .unwrap();
+
+ let offset_info = client
+ .get_consumer_offset(consumer, stream, topic, Some(PARTITION_ID))
+ .await
+ .unwrap()
+ .expect("Failed to get offset");
+ assert_eq!(offset_info.stored_offset, max_offset);
+
+ // Attempt to store current_offset + 1 (should fail)
+ let invalid_offset = max_offset + 1;
+ let err = client
+ .store_consumer_offset(consumer, stream, topic, Some(PARTITION_ID),
invalid_offset)
+ .await
+ .expect_err("Storing offset max_offset + 1 should fail");
+ assert_invalid_offset_error(&err, invalid_offset);
+}
+
+async fn send_messages(client: &IggyClient, stream: &Identifier, topic:
&Identifier) {
+ let mut messages = Vec::new();
+ for offset in 0..MESSAGES_COUNT {
+ messages.push(
+ IggyMessage::builder()
+ .id((offset + 1) as u128)
+ .payload(Bytes::from(format!("message {}", offset)))
+ .build()
+ .expect("Failed to create message"),
+ );
+ }
+ client
+ .send_messages(
+ stream,
+ topic,
+ &Partitioning::partition_id(PARTITION_ID),
+ &mut messages,
+ )
+ .await
+ .unwrap();
+}
diff --git a/core/integration/tests/server/scenarios/mod.rs
b/core/integration/tests/server/scenarios/mod.rs
index 22b08f596..5a3b3f7a4 100644
--- a/core/integration/tests/server/scenarios/mod.rs
+++ b/core/integration/tests/server/scenarios/mod.rs
@@ -29,6 +29,7 @@ pub mod consumer_timestamp_polling_scenario;
pub mod create_message_payload;
pub mod cross_protocol_pat_scenario;
pub mod encryption_scenario;
+pub mod invalid_consumer_offset_scenario;
pub mod log_rotation_scenario;
pub mod message_cleanup_scenario;
pub mod message_headers_scenario;
diff --git a/core/server/src/shard/system/consumer_offsets.rs
b/core/server/src/shard/system/consumer_offsets.rs
index c2572231b..ab287a834 100644
--- a/core/server/src/shard/system/consumer_offsets.rs
+++ b/core/server/src/shard/system/consumer_offsets.rs
@@ -51,16 +51,7 @@ impl IggyShard {
return Err(IggyError::NotResolvedConsumer(consumer.id));
};
- if !self
- .metadata
- .partition_exists(topic.stream_id, topic.topic_id, partition_id)
- {
- return Err(IggyError::PartitionNotFound(
- partition_id,
- Identifier::numeric(topic.topic_id as u32).expect("valid topic
id"),
- Identifier::numeric(topic.stream_id as u32).expect("valid
stream id"),
- ));
- }
+ self.validate_partition_offset(topic.stream_id, topic.topic_id,
partition_id, offset)?;
self.store_consumer_offset_base(
topic.stream_id,
diff --git a/core/server/src/shard/system/utils.rs
b/core/server/src/shard/system/utils.rs
index 1fb59a645..fc5f37a5f 100644
--- a/core/server/src/shard/system/utils.rs
+++ b/core/server/src/shard/system/utils.rs
@@ -127,6 +127,33 @@ impl IggyShard {
Ok(())
}
+ /// Validates that consumer_offset does not exceed actual partition offset.
+ pub fn validate_partition_offset(
+ &self,
+ stream_id: usize,
+ topic_id: usize,
+ partition_id: usize,
+ consumer_offset: u64,
+ ) -> Result<(), IggyError> {
+ let partition_stats = self
+ .metadata
+ .get_partition_stats_by_ids(stream_id, topic_id, partition_id)
+ .ok_or(IggyError::PartitionNotFound(
+ partition_id,
+ Identifier::numeric(topic_id as u32).expect("numeric
identifier is always valid"),
+ Identifier::numeric(stream_id as u32).expect("numeric
identifier is always valid"),
+ ))?;
+
+ // Also rejects storing any offset if the partition is completely
empty (i.e., has never contained any messages).
+ if (partition_stats.messages_count_inconsistent() == 0
+ && partition_stats.current_offset() == 0)
+ || consumer_offset > partition_stats.current_offset()
+ {
+ return Err(IggyError::InvalidOffset(consumer_offset));
+ }
+ Ok(())
+ }
+
/// Resolves consumer with partition ID for polling/offset operations.
/// For consumer groups, all lookups happen under a single metadata read
guard.
pub fn resolve_consumer_with_partition_id(
diff --git
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/ConsumerOffsetsClientBaseTest.java
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/ConsumerOffsetsClientBaseTest.java
index f80fc763a..633c84cd8 100644
---
a/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/ConsumerOffsetsClientBaseTest.java
+++
b/foreign/java/java-sdk/src/test/java/org/apache/iggy/client/blocking/ConsumerOffsetsClientBaseTest.java
@@ -21,10 +21,13 @@ package org.apache.iggy.client.blocking;
import org.apache.iggy.consumergroup.Consumer;
import org.apache.iggy.identifier.ConsumerId;
+import org.apache.iggy.message.Message;
+import org.apache.iggy.message.Partitioning;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.math.BigInteger;
+import java.util.List;
import java.util.Optional;
import static org.apache.iggy.TestConstants.STREAM_NAME;
@@ -45,6 +48,10 @@ public abstract class ConsumerOffsetsClientBaseTest extends
IntegrationTest {
@Test
void shouldGetConsumerOffset() {
+ // given. Send message to ensure partition is not empty so we can
store offset 0
+ client.messages()
+ .sendMessages(STREAM_NAME, TOPIC_NAME,
Partitioning.partitionId(0L), List.of(Message.of("test")));
+
// when
var consumer = new Consumer(Consumer.Kind.Consumer,
ConsumerId.of(1223L));
consumerOffsetsClient.storeConsumerOffset(STREAM_NAME, TOPIC_NAME,
Optional.empty(), consumer, BigInteger.ZERO);