This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 1fb20b62a fix(server): broadcast PAT events from HTTP handlers to all
shards (#2589)
1fb20b62a is described below
commit 1fb20b62a1ca0cbce80c7dbcdb2bbc063c8e6668
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Tue Jan 20 14:05:30 2026 +0100
fix(server): broadcast PAT events from HTTP handlers to all shards (#2589)
TCP connections land on random shards via SO_REUSEPORT,
but HTTP runs only on shard 0. Without cross-shard broadcast,
PATs created/deleted via HTTP were invisible to TCP clients on
other shards.
---
.../scenarios/cross_protocol_pat_scenario.rs | 233 +++++++++++++++++++++
core/integration/tests/server/scenarios/mod.rs | 1 +
core/integration/tests/server/specific.rs | 23 +-
core/server/src/http/personal_access_tokens.rs | 34 ++-
4 files changed, 288 insertions(+), 3 deletions(-)
diff --git
a/core/integration/tests/server/scenarios/cross_protocol_pat_scenario.rs
b/core/integration/tests/server/scenarios/cross_protocol_pat_scenario.rs
new file mode 100644
index 000000000..f8226daf8
--- /dev/null
+++ b/core/integration/tests/server/scenarios/cross_protocol_pat_scenario.rs
@@ -0,0 +1,233 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//! Regression test for HTTP handlers not broadcasting PAT events to other
shards.
+//! TCP connections may land on any shard via SO_REUSEPORT, so without
broadcast
+//! they won't see PATs created via HTTP (which runs only on shard 0).
+
+use iggy::prelude::*;
+use integration::{
+ http_client::HttpClientFactory,
+ tcp_client::TcpClientFactory,
+ test_server::{ClientFactory, IpAddrKind, TestServer, login_root},
+};
+use std::collections::HashMap;
+
+const PAT_NAME: &str = "cross-protocol-test-pat";
+const TCP_CLIENT_COUNT: usize = 20;
+
+/// Create PAT via HTTP, list via multiple TCP connections.
+pub async fn run() {
+ let mut extra_envs = HashMap::new();
+ extra_envs.insert(
+ "IGGY_TCP_SOCKET_OVERRIDE_DEFAULTS".to_string(),
+ "true".to_string(),
+ );
+ extra_envs.insert("IGGY_TCP_SOCKET_NODELAY".to_string(),
"true".to_string());
+
+ let mut test_server = TestServer::new(Some(extra_envs), true, None,
IpAddrKind::V4);
+ test_server.start();
+
+ let http_addr = test_server
+ .get_http_api_addr()
+ .expect("HTTP address not available");
+ let tcp_addr = test_server
+ .get_raw_tcp_addr()
+ .expect("TCP address not available");
+
+ let http_factory = HttpClientFactory {
+ server_addr: http_addr,
+ };
+ let http_client = create_client(&http_factory).await;
+ login_root(&http_client).await;
+
+ let created_pat = http_client
+ .create_personal_access_token(PAT_NAME, IggyExpiry::NeverExpire)
+ .await
+ .expect("Failed to create PAT via HTTP");
+
+ assert!(!created_pat.token.is_empty());
+
+ let http_pats = http_client
+ .get_personal_access_tokens()
+ .await
+ .expect("Failed to get PATs via HTTP");
+ assert_eq!(http_pats.len(), 1);
+ assert_eq!(http_pats[0].name, PAT_NAME);
+
+ let tcp_factory = TcpClientFactory {
+ server_addr: tcp_addr,
+ ..Default::default()
+ };
+
+ let mut failures = Vec::new();
+
+ for i in 0..TCP_CLIENT_COUNT {
+ let tcp_client = create_client(&tcp_factory).await;
+ login_root(&tcp_client).await;
+
+ let tcp_pats = tcp_client
+ .get_personal_access_tokens()
+ .await
+ .expect("Failed to get PATs via TCP");
+
+ if tcp_pats.is_empty() {
+ failures.push(format!("TCP client {} saw 0 PATs (expected 1)", i));
+ } else if tcp_pats.len() != 1 {
+ failures.push(format!(
+ "TCP client {} saw {} PATs (expected 1)",
+ i,
+ tcp_pats.len()
+ ));
+ } else if tcp_pats[0].name != PAT_NAME {
+ failures.push(format!(
+ "TCP client {} saw wrong PAT name: {}",
+ i, tcp_pats[0].name
+ ));
+ }
+ }
+
+ let _ = http_client.delete_personal_access_token(PAT_NAME).await;
+
+ assert!(
+ failures.is_empty(),
+ "Cross-protocol PAT visibility failures ({}/{} clients failed):\n{}",
+ failures.len(),
+ TCP_CLIENT_COUNT,
+ failures.join("\n")
+ );
+}
+
+/// Create PAT via TCP, list via HTTP. Should work since TCP handlers
broadcast.
+pub async fn run_tcp_to_http() {
+ let mut extra_envs = HashMap::new();
+ extra_envs.insert(
+ "IGGY_TCP_SOCKET_OVERRIDE_DEFAULTS".to_string(),
+ "true".to_string(),
+ );
+ extra_envs.insert("IGGY_TCP_SOCKET_NODELAY".to_string(),
"true".to_string());
+
+ let mut test_server = TestServer::new(Some(extra_envs), true, None,
IpAddrKind::V4);
+ test_server.start();
+
+ let http_addr = test_server
+ .get_http_api_addr()
+ .expect("HTTP address not available");
+ let tcp_addr = test_server
+ .get_raw_tcp_addr()
+ .expect("TCP address not available");
+
+ let tcp_factory = TcpClientFactory {
+ server_addr: tcp_addr,
+ ..Default::default()
+ };
+ let tcp_client = create_client(&tcp_factory).await;
+ login_root(&tcp_client).await;
+
+ let created_pat = tcp_client
+ .create_personal_access_token(PAT_NAME, IggyExpiry::NeverExpire)
+ .await
+ .expect("Failed to create PAT via TCP");
+
+ assert!(!created_pat.token.is_empty());
+
+ let http_factory = HttpClientFactory {
+ server_addr: http_addr,
+ };
+ let http_client = create_client(&http_factory).await;
+ login_root(&http_client).await;
+
+ let http_pats = http_client
+ .get_personal_access_tokens()
+ .await
+ .expect("Failed to get PATs via HTTP");
+
+ assert_eq!(http_pats.len(), 1);
+ assert_eq!(http_pats[0].name, PAT_NAME);
+
+ let _ = tcp_client.delete_personal_access_token(PAT_NAME).await;
+}
+
+/// Create via TCP, delete via HTTP, verify deletion visible via TCP.
+pub async fn run_delete_visibility() {
+ let mut extra_envs = HashMap::new();
+ extra_envs.insert(
+ "IGGY_TCP_SOCKET_OVERRIDE_DEFAULTS".to_string(),
+ "true".to_string(),
+ );
+ extra_envs.insert("IGGY_TCP_SOCKET_NODELAY".to_string(),
"true".to_string());
+
+ let mut test_server = TestServer::new(Some(extra_envs), true, None,
IpAddrKind::V4);
+ test_server.start();
+
+ let http_addr = test_server
+ .get_http_api_addr()
+ .expect("HTTP address not available");
+ let tcp_addr = test_server
+ .get_raw_tcp_addr()
+ .expect("TCP address not available");
+
+ let tcp_factory = TcpClientFactory {
+ server_addr: tcp_addr,
+ ..Default::default()
+ };
+ let tcp_client = create_client(&tcp_factory).await;
+ login_root(&tcp_client).await;
+
+ tcp_client
+ .create_personal_access_token(PAT_NAME, IggyExpiry::NeverExpire)
+ .await
+ .expect("Failed to create PAT via TCP");
+
+ let http_factory = HttpClientFactory {
+ server_addr: http_addr,
+ };
+ let http_client = create_client(&http_factory).await;
+ login_root(&http_client).await;
+
+ http_client
+ .delete_personal_access_token(PAT_NAME)
+ .await
+ .expect("Failed to delete PAT via HTTP");
+
+ let mut still_visible_count = 0;
+ for _ in 0..TCP_CLIENT_COUNT {
+ let tcp_check_client = create_client(&tcp_factory).await;
+ login_root(&tcp_check_client).await;
+
+ let tcp_pats = tcp_check_client
+ .get_personal_access_tokens()
+ .await
+ .expect("Failed to get PATs via TCP");
+
+ if !tcp_pats.is_empty() {
+ still_visible_count += 1;
+ }
+ }
+
+ assert_eq!(
+ still_visible_count, 0,
+ "Deleted PAT still visible to {} TCP clients",
+ still_visible_count
+ );
+}
+
+async fn create_client(client_factory: &dyn ClientFactory) -> IggyClient {
+ let client = client_factory.create_client().await;
+ IggyClient::create(client, None, None)
+}
diff --git a/core/integration/tests/server/scenarios/mod.rs
b/core/integration/tests/server/scenarios/mod.rs
index e61594a91..58cc8a3b2 100644
--- a/core/integration/tests/server/scenarios/mod.rs
+++ b/core/integration/tests/server/scenarios/mod.rs
@@ -26,6 +26,7 @@ pub mod
consumer_group_with_multiple_clients_polling_messages_scenario;
pub mod consumer_group_with_single_client_polling_messages_scenario;
pub mod consumer_timestamp_polling_scenario;
pub mod create_message_payload;
+pub mod cross_protocol_pat_scenario;
pub mod delete_segments_scenario;
pub mod encryption_scenario;
pub mod get_messages_by_offset_api;
diff --git a/core/integration/tests/server/specific.rs
b/core/integration/tests/server/specific.rs
index 9a59ce468..17cc6b05e 100644
--- a/core/integration/tests/server/specific.rs
+++ b/core/integration/tests/server/specific.rs
@@ -17,8 +17,9 @@
*/
use crate::server::scenarios::{
- delete_segments_scenario, message_size_scenario,
segment_rotation_race_scenario,
- single_message_per_batch_scenario, tcp_tls_scenario,
websocket_tls_scenario,
+ cross_protocol_pat_scenario, delete_segments_scenario,
message_size_scenario,
+ segment_rotation_race_scenario, single_message_per_batch_scenario,
tcp_tls_scenario,
+ websocket_tls_scenario,
};
use iggy::prelude::*;
use integration::{
@@ -292,3 +293,21 @@ async fn segment_rotation_scenario() {
segment_rotation_race_scenario::run(&factories).await;
}
+
+#[tokio::test]
+#[parallel]
+async fn should_see_pat_created_via_http_when_listing_via_tcp() {
+ cross_protocol_pat_scenario::run().await;
+}
+
+#[tokio::test]
+#[parallel]
+async fn should_see_pat_created_via_tcp_when_listing_via_http() {
+ cross_protocol_pat_scenario::run_tcp_to_http().await;
+}
+
+#[tokio::test]
+#[parallel]
+async fn should_not_see_pat_deleted_via_http_when_listing_via_tcp() {
+ cross_protocol_pat_scenario::run_delete_visibility().await;
+}
diff --git a/core/server/src/http/personal_access_tokens.rs
b/core/server/src/http/personal_access_tokens.rs
index 94d89bdb4..a8244d160 100644
--- a/core/server/src/http/personal_access_tokens.rs
+++ b/core/server/src/http/personal_access_tokens.rs
@@ -84,7 +84,7 @@ async fn create_personal_access_token(
Json(command): Json<CreatePersonalAccessToken>,
) -> Result<Json<RawPersonalAccessToken>, CustomError> {
command.validate()?;
- let (_personal_access_token, token) = state
+ let (personal_access_token, token) = state
.shard
.create_personal_access_token(identity.user_id, &command.name,
command.expiry)
.error(|e: &IggyError| {
@@ -94,6 +94,21 @@ async fn create_personal_access_token(
)
})?;
+ {
+ let broadcast_future = SendWrapper::new(async {
+ use crate::shard::transmission::event::ShardEvent;
+ let event = ShardEvent::CreatedPersonalAccessToken {
+ personal_access_token: personal_access_token.clone(),
+ };
+ let _ = state
+ .shard
+ .shard()
+ .broadcast_event_to_all_shards(event)
+ .await;
+ });
+ broadcast_future.await;
+ }
+
let token_hash = PersonalAccessToken::hash_token(&token);
let command =
EntryCommand::CreatePersonalAccessToken(CreatePersonalAccessTokenWithHash {
command,
@@ -119,6 +134,7 @@ async fn delete_personal_access_token(
Extension(identity): Extension<Identity>,
Path(name): Path<String>,
) -> Result<StatusCode, CustomError> {
+ let token_name = name.clone();
state
.shard
.delete_personal_access_token(identity.user_id, &name)
@@ -129,6 +145,22 @@ async fn delete_personal_access_token(
)
})?;
+ {
+ let broadcast_future = SendWrapper::new(async {
+ use crate::shard::transmission::event::ShardEvent;
+ let event = ShardEvent::DeletedPersonalAccessToken {
+ user_id: identity.user_id,
+ name: token_name,
+ };
+ let _ = state
+ .shard
+ .shard()
+ .broadcast_event_to_all_shards(event)
+ .await;
+ });
+ broadcast_future.await;
+ }
+
let command =
EntryCommand::DeletePersonalAccessToken(DeletePersonalAccessToken { name });
let state_future =
SendWrapper::new(state.shard.shard().state.apply(identity.user_id,
&command));