hubcio commented on code in PR #2879: URL: https://github.com/apache/iggy/pull/2879#discussion_r2907457884
########## core/integration/tests/server/scenarios/message_deduplication_scenario.rs: ########## @@ -0,0 +1,187 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::server::scenarios::{PARTITION_ID, STREAM_NAME, TOPIC_NAME, cleanup}; +use bytes::Bytes; +use iggy::prelude::*; +use integration::harness::{TestHarness, assert_clean_system}; + +const MESSAGES_PER_BATCH: u32 = 10; + +pub async fn run(harness: &TestHarness) { + let client = harness + .root_client() + .await + .expect("Failed to get root client"); + init_system(&client).await; + + // Step 1: Duplicate rejection + // Send messages with IDs 1..`MESSAGES_PER_BATCH`, then resend the same IDs. + // Only the first batch should be persisted. + let mut original_messages = build_messages(1, MESSAGES_PER_BATCH, "original"); + client + .send_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + &Partitioning::partition_id(PARTITION_ID), + &mut original_messages, + ) + .await + .unwrap(); + + let mut duplicate_messages = build_messages(1, MESSAGES_PER_BATCH, "duplicate"); + client + .send_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + &Partitioning::partition_id(PARTITION_ID), + &mut duplicate_messages, + ) + .await + .unwrap(); + + let polled = poll_all(&client, MESSAGES_PER_BATCH * 2).await; + assert_eq!( + polled.messages.len() as u32, + MESSAGES_PER_BATCH, + "Duplicate messages should have been dropped" + ); + for msg in &polled.messages { + let payload = std::str::from_utf8(&msg.payload).unwrap(); + assert!( + payload.starts_with("original"), + "Expected original payload, got: {payload}" + ); + } + + // Step 2: Unique messages pass through + // Send messages with new IDs (`MESSAGES_PER_BATCH`+1)..(`MESSAGES_PER_BATCH`*2) — these should all be accepted. + let mut new_messages = build_messages(MESSAGES_PER_BATCH + 1, MESSAGES_PER_BATCH, "new-unique"); + client + .send_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + &Partitioning::partition_id(PARTITION_ID), + &mut new_messages, + ) + .await + .unwrap(); + + let polled = poll_all(&client, MESSAGES_PER_BATCH * 3).await; + assert_eq!( + polled.messages.len() as u32, + MESSAGES_PER_BATCH * 2, + "Unique messages should have been accepted" + ); + + // Step 3: Partial deduplication + // Send a batch where half the IDs are duplicates and half are new. + // IDs `overlap_id_start`..(`overlap_id_start`+`MESSAGES_PER_BATCH`): + // IDs `overlap_id_start`..(`MESSAGES_PER_BATCH`*2) already exist (from Step 2), + // IDs (`MESSAGES_PER_BATCH`*2+1)..(`overlap_id_start`+`MESSAGES_PER_BATCH`) are new. + let overlap_id_start = 16; + let total_after_step3 = MESSAGES_PER_BATCH + overlap_id_start - 1; + // Ensure the `overlap_id_start` is between two batches. + assert!(overlap_id_start > MESSAGES_PER_BATCH && overlap_id_start < 2 * MESSAGES_PER_BATCH); + let mut mixed_messages = build_messages(overlap_id_start, MESSAGES_PER_BATCH, "mixed"); + client + .send_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + &Partitioning::partition_id(PARTITION_ID), + &mut mixed_messages, + ) + .await + .unwrap(); + + let polled = poll_all(&client, MESSAGES_PER_BATCH * 4).await; + assert_eq!( + polled.messages.len() as u32, + total_after_step3, + "Only new IDs from the mixed batch should have been accepted" + ); Review Comment: steps 3 and 4 only check message counts but never verify payloads. step 1 does this correctly (checks for "original" prefix), but step 3 doesn't verify the 5 surviving messages are actually the new ones (ids 21-25 with "mixed" payload) andnot some other combination. same for step 4 -- should check "after-ttl" payloads. count-only assertions can pass even if the wrong messages are kept. ########## core/server/src/shard/system/messages.rs: ########## @@ -356,6 +356,10 @@ impl IggyShard { ) .await; + if batch.count() == 0 { + return Ok(()); + } Review Comment: i get that it's a fix, but you didn't test it. nothing in this test actually exercises it. you'd need a step that sends a batch where every message is a duplicate (e.g. resend ids 1-10 a third time without any new ids mixed in) and verify the server doesn't blow up and the message count stays the same. ########## core/integration/tests/server/scenarios/message_deduplication_scenario.rs: ########## @@ -0,0 +1,187 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::server::scenarios::{PARTITION_ID, STREAM_NAME, TOPIC_NAME, cleanup}; +use bytes::Bytes; +use iggy::prelude::*; +use integration::harness::{TestHarness, assert_clean_system}; + +const MESSAGES_PER_BATCH: u32 = 10; + +pub async fn run(harness: &TestHarness) { + let client = harness + .root_client() + .await + .expect("Failed to get root client"); + init_system(&client).await; + + // Step 1: Duplicate rejection + // Send messages with IDs 1..`MESSAGES_PER_BATCH`, then resend the same IDs. + // Only the first batch should be persisted. + let mut original_messages = build_messages(1, MESSAGES_PER_BATCH, "original"); + client + .send_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + &Partitioning::partition_id(PARTITION_ID), + &mut original_messages, + ) + .await + .unwrap(); + + let mut duplicate_messages = build_messages(1, MESSAGES_PER_BATCH, "duplicate"); + client + .send_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + &Partitioning::partition_id(PARTITION_ID), + &mut duplicate_messages, + ) + .await + .unwrap(); + + let polled = poll_all(&client, MESSAGES_PER_BATCH * 2).await; + assert_eq!( + polled.messages.len() as u32, + MESSAGES_PER_BATCH, + "Duplicate messages should have been dropped" + ); Review Comment: after dedup removes messages mid-batch, offsets and indexes get rebuilt via remove_messages(). you never verify that the polled messages actually have correct, contiguous offsets. a bug in the offset recalculation would be a silent data corruption issue and this test wouldn't catch it. ########## core/integration/tests/server/scenarios/message_deduplication_scenario.rs: ########## @@ -0,0 +1,187 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::server::scenarios::{PARTITION_ID, STREAM_NAME, TOPIC_NAME, cleanup}; +use bytes::Bytes; +use iggy::prelude::*; +use integration::harness::{TestHarness, assert_clean_system}; + +const MESSAGES_PER_BATCH: u32 = 10; + +pub async fn run(harness: &TestHarness) { + let client = harness + .root_client() + .await + .expect("Failed to get root client"); + init_system(&client).await; + + // Step 1: Duplicate rejection + // Send messages with IDs 1..`MESSAGES_PER_BATCH`, then resend the same IDs. + // Only the first batch should be persisted. + let mut original_messages = build_messages(1, MESSAGES_PER_BATCH, "original"); + client + .send_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + &Partitioning::partition_id(PARTITION_ID), + &mut original_messages, + ) + .await + .unwrap(); + + let mut duplicate_messages = build_messages(1, MESSAGES_PER_BATCH, "duplicate"); + client + .send_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + &Partitioning::partition_id(PARTITION_ID), + &mut duplicate_messages, + ) + .await + .unwrap(); + + let polled = poll_all(&client, MESSAGES_PER_BATCH * 2).await; + assert_eq!( + polled.messages.len() as u32, + MESSAGES_PER_BATCH, + "Duplicate messages should have been dropped" + ); + for msg in &polled.messages { + let payload = std::str::from_utf8(&msg.payload).unwrap(); + assert!( + payload.starts_with("original"), + "Expected original payload, got: {payload}" + ); + } + + // Step 2: Unique messages pass through + // Send messages with new IDs (`MESSAGES_PER_BATCH`+1)..(`MESSAGES_PER_BATCH`*2) — these should all be accepted. + let mut new_messages = build_messages(MESSAGES_PER_BATCH + 1, MESSAGES_PER_BATCH, "new-unique"); + client + .send_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + &Partitioning::partition_id(PARTITION_ID), + &mut new_messages, + ) + .await + .unwrap(); + + let polled = poll_all(&client, MESSAGES_PER_BATCH * 3).await; + assert_eq!( + polled.messages.len() as u32, + MESSAGES_PER_BATCH * 2, + "Unique messages should have been accepted" + ); + + // Step 3: Partial deduplication + // Send a batch where half the IDs are duplicates and half are new. + // IDs `overlap_id_start`..(`overlap_id_start`+`MESSAGES_PER_BATCH`): + // IDs `overlap_id_start`..(`MESSAGES_PER_BATCH`*2) already exist (from Step 2), + // IDs (`MESSAGES_PER_BATCH`*2+1)..(`overlap_id_start`+`MESSAGES_PER_BATCH`) are new. + let overlap_id_start = 16; + let total_after_step3 = MESSAGES_PER_BATCH + overlap_id_start - 1; + // Ensure the `overlap_id_start` is between two batches. + assert!(overlap_id_start > MESSAGES_PER_BATCH && overlap_id_start < 2 * MESSAGES_PER_BATCH); Review Comment: overlap_id_start = 16 is a magic number and the formula `total_after_step3 = MESSAGES_PER_BATCH + overlap_id_start - 1` only works because ids happen to be contiguous from 1. consider deriving it from the actual unique id range, e.g. `let new_ids_in_mixed = (overlap_id_start + MESSAGES_PER_BATCH) - (2 * MESSAGES_PER_BATCH + 1)` or just computing the set difference explicitly. also `build_messages` uses 0-based `i` in the payload instead of the actual message id, which makes correlating payloads to ids harder than it needs to be. ########## core/integration/tests/server/scenarios/message_deduplication_scenario.rs: ########## Review Comment: you never test id 0 (auto-generated ids). the server replaces 0 with a uuid before the dedup check (messages_batch_mut.rs:164-166), so sending multiple messages with id 0 should all pass through. without this, you could silently break the default send path and never know. after that, please also poll the messages to check whether they have unique message id. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
