This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fb20b62a fix(server): broadcast PAT events from HTTP handlers to all 
shards (#2589)
1fb20b62a is described below

commit 1fb20b62a1ca0cbce80c7dbcdb2bbc063c8e6668
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Tue Jan 20 14:05:30 2026 +0100

    fix(server): broadcast PAT events from HTTP handlers to all shards (#2589)
    
    TCP connections land on random shards via SO_REUSEPORT,
    but HTTP runs only on shard 0. Without cross-shard broadcast,
    PATs created/deleted via HTTP were invisible to TCP clients on
    other shards.
---
 .../scenarios/cross_protocol_pat_scenario.rs       | 233 +++++++++++++++++++++
 core/integration/tests/server/scenarios/mod.rs     |   1 +
 core/integration/tests/server/specific.rs          |  23 +-
 core/server/src/http/personal_access_tokens.rs     |  34 ++-
 4 files changed, 288 insertions(+), 3 deletions(-)

diff --git 
a/core/integration/tests/server/scenarios/cross_protocol_pat_scenario.rs 
b/core/integration/tests/server/scenarios/cross_protocol_pat_scenario.rs
new file mode 100644
index 000000000..f8226daf8
--- /dev/null
+++ b/core/integration/tests/server/scenarios/cross_protocol_pat_scenario.rs
@@ -0,0 +1,233 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//! Regression test for HTTP handlers not broadcasting PAT events to other 
shards.
+//! TCP connections may land on any shard via SO_REUSEPORT, so without 
broadcast
+//! they won't see PATs created via HTTP (which runs only on shard 0).
+
+use iggy::prelude::*;
+use integration::{
+    http_client::HttpClientFactory,
+    tcp_client::TcpClientFactory,
+    test_server::{ClientFactory, IpAddrKind, TestServer, login_root},
+};
+use std::collections::HashMap;
+
+const PAT_NAME: &str = "cross-protocol-test-pat";
+const TCP_CLIENT_COUNT: usize = 20;
+
+/// Create PAT via HTTP, list via multiple TCP connections.
+pub async fn run() {
+    let mut extra_envs = HashMap::new();
+    extra_envs.insert(
+        "IGGY_TCP_SOCKET_OVERRIDE_DEFAULTS".to_string(),
+        "true".to_string(),
+    );
+    extra_envs.insert("IGGY_TCP_SOCKET_NODELAY".to_string(), 
"true".to_string());
+
+    let mut test_server = TestServer::new(Some(extra_envs), true, None, 
IpAddrKind::V4);
+    test_server.start();
+
+    let http_addr = test_server
+        .get_http_api_addr()
+        .expect("HTTP address not available");
+    let tcp_addr = test_server
+        .get_raw_tcp_addr()
+        .expect("TCP address not available");
+
+    let http_factory = HttpClientFactory {
+        server_addr: http_addr,
+    };
+    let http_client = create_client(&http_factory).await;
+    login_root(&http_client).await;
+
+    let created_pat = http_client
+        .create_personal_access_token(PAT_NAME, IggyExpiry::NeverExpire)
+        .await
+        .expect("Failed to create PAT via HTTP");
+
+    assert!(!created_pat.token.is_empty());
+
+    let http_pats = http_client
+        .get_personal_access_tokens()
+        .await
+        .expect("Failed to get PATs via HTTP");
+    assert_eq!(http_pats.len(), 1);
+    assert_eq!(http_pats[0].name, PAT_NAME);
+
+    let tcp_factory = TcpClientFactory {
+        server_addr: tcp_addr,
+        ..Default::default()
+    };
+
+    let mut failures = Vec::new();
+
+    for i in 0..TCP_CLIENT_COUNT {
+        let tcp_client = create_client(&tcp_factory).await;
+        login_root(&tcp_client).await;
+
+        let tcp_pats = tcp_client
+            .get_personal_access_tokens()
+            .await
+            .expect("Failed to get PATs via TCP");
+
+        if tcp_pats.is_empty() {
+            failures.push(format!("TCP client {} saw 0 PATs (expected 1)", i));
+        } else if tcp_pats.len() != 1 {
+            failures.push(format!(
+                "TCP client {} saw {} PATs (expected 1)",
+                i,
+                tcp_pats.len()
+            ));
+        } else if tcp_pats[0].name != PAT_NAME {
+            failures.push(format!(
+                "TCP client {} saw wrong PAT name: {}",
+                i, tcp_pats[0].name
+            ));
+        }
+    }
+
+    let _ = http_client.delete_personal_access_token(PAT_NAME).await;
+
+    assert!(
+        failures.is_empty(),
+        "Cross-protocol PAT visibility failures ({}/{} clients failed):\n{}",
+        failures.len(),
+        TCP_CLIENT_COUNT,
+        failures.join("\n")
+    );
+}
+
+/// Create PAT via TCP, list via HTTP. Should work since TCP handlers 
broadcast.
+pub async fn run_tcp_to_http() {
+    let mut extra_envs = HashMap::new();
+    extra_envs.insert(
+        "IGGY_TCP_SOCKET_OVERRIDE_DEFAULTS".to_string(),
+        "true".to_string(),
+    );
+    extra_envs.insert("IGGY_TCP_SOCKET_NODELAY".to_string(), 
"true".to_string());
+
+    let mut test_server = TestServer::new(Some(extra_envs), true, None, 
IpAddrKind::V4);
+    test_server.start();
+
+    let http_addr = test_server
+        .get_http_api_addr()
+        .expect("HTTP address not available");
+    let tcp_addr = test_server
+        .get_raw_tcp_addr()
+        .expect("TCP address not available");
+
+    let tcp_factory = TcpClientFactory {
+        server_addr: tcp_addr,
+        ..Default::default()
+    };
+    let tcp_client = create_client(&tcp_factory).await;
+    login_root(&tcp_client).await;
+
+    let created_pat = tcp_client
+        .create_personal_access_token(PAT_NAME, IggyExpiry::NeverExpire)
+        .await
+        .expect("Failed to create PAT via TCP");
+
+    assert!(!created_pat.token.is_empty());
+
+    let http_factory = HttpClientFactory {
+        server_addr: http_addr,
+    };
+    let http_client = create_client(&http_factory).await;
+    login_root(&http_client).await;
+
+    let http_pats = http_client
+        .get_personal_access_tokens()
+        .await
+        .expect("Failed to get PATs via HTTP");
+
+    assert_eq!(http_pats.len(), 1);
+    assert_eq!(http_pats[0].name, PAT_NAME);
+
+    let _ = tcp_client.delete_personal_access_token(PAT_NAME).await;
+}
+
+/// Create via TCP, delete via HTTP, verify deletion visible via TCP.
+pub async fn run_delete_visibility() {
+    let mut extra_envs = HashMap::new();
+    extra_envs.insert(
+        "IGGY_TCP_SOCKET_OVERRIDE_DEFAULTS".to_string(),
+        "true".to_string(),
+    );
+    extra_envs.insert("IGGY_TCP_SOCKET_NODELAY".to_string(), 
"true".to_string());
+
+    let mut test_server = TestServer::new(Some(extra_envs), true, None, 
IpAddrKind::V4);
+    test_server.start();
+
+    let http_addr = test_server
+        .get_http_api_addr()
+        .expect("HTTP address not available");
+    let tcp_addr = test_server
+        .get_raw_tcp_addr()
+        .expect("TCP address not available");
+
+    let tcp_factory = TcpClientFactory {
+        server_addr: tcp_addr,
+        ..Default::default()
+    };
+    let tcp_client = create_client(&tcp_factory).await;
+    login_root(&tcp_client).await;
+
+    tcp_client
+        .create_personal_access_token(PAT_NAME, IggyExpiry::NeverExpire)
+        .await
+        .expect("Failed to create PAT via TCP");
+
+    let http_factory = HttpClientFactory {
+        server_addr: http_addr,
+    };
+    let http_client = create_client(&http_factory).await;
+    login_root(&http_client).await;
+
+    http_client
+        .delete_personal_access_token(PAT_NAME)
+        .await
+        .expect("Failed to delete PAT via HTTP");
+
+    let mut still_visible_count = 0;
+    for _ in 0..TCP_CLIENT_COUNT {
+        let tcp_check_client = create_client(&tcp_factory).await;
+        login_root(&tcp_check_client).await;
+
+        let tcp_pats = tcp_check_client
+            .get_personal_access_tokens()
+            .await
+            .expect("Failed to get PATs via TCP");
+
+        if !tcp_pats.is_empty() {
+            still_visible_count += 1;
+        }
+    }
+
+    assert_eq!(
+        still_visible_count, 0,
+        "Deleted PAT still visible to {} TCP clients",
+        still_visible_count
+    );
+}
+
+async fn create_client(client_factory: &dyn ClientFactory) -> IggyClient {
+    let client = client_factory.create_client().await;
+    IggyClient::create(client, None, None)
+}
diff --git a/core/integration/tests/server/scenarios/mod.rs 
b/core/integration/tests/server/scenarios/mod.rs
index e61594a91..58cc8a3b2 100644
--- a/core/integration/tests/server/scenarios/mod.rs
+++ b/core/integration/tests/server/scenarios/mod.rs
@@ -26,6 +26,7 @@ pub mod 
consumer_group_with_multiple_clients_polling_messages_scenario;
 pub mod consumer_group_with_single_client_polling_messages_scenario;
 pub mod consumer_timestamp_polling_scenario;
 pub mod create_message_payload;
+pub mod cross_protocol_pat_scenario;
 pub mod delete_segments_scenario;
 pub mod encryption_scenario;
 pub mod get_messages_by_offset_api;
diff --git a/core/integration/tests/server/specific.rs 
b/core/integration/tests/server/specific.rs
index 9a59ce468..17cc6b05e 100644
--- a/core/integration/tests/server/specific.rs
+++ b/core/integration/tests/server/specific.rs
@@ -17,8 +17,9 @@
  */
 
 use crate::server::scenarios::{
-    delete_segments_scenario, message_size_scenario, 
segment_rotation_race_scenario,
-    single_message_per_batch_scenario, tcp_tls_scenario, 
websocket_tls_scenario,
+    cross_protocol_pat_scenario, delete_segments_scenario, 
message_size_scenario,
+    segment_rotation_race_scenario, single_message_per_batch_scenario, 
tcp_tls_scenario,
+    websocket_tls_scenario,
 };
 use iggy::prelude::*;
 use integration::{
@@ -292,3 +293,21 @@ async fn segment_rotation_scenario() {
 
     segment_rotation_race_scenario::run(&factories).await;
 }
+
+#[tokio::test]
+#[parallel]
+async fn should_see_pat_created_via_http_when_listing_via_tcp() {
+    cross_protocol_pat_scenario::run().await;
+}
+
+#[tokio::test]
+#[parallel]
+async fn should_see_pat_created_via_tcp_when_listing_via_http() {
+    cross_protocol_pat_scenario::run_tcp_to_http().await;
+}
+
+#[tokio::test]
+#[parallel]
+async fn should_not_see_pat_deleted_via_http_when_listing_via_tcp() {
+    cross_protocol_pat_scenario::run_delete_visibility().await;
+}
diff --git a/core/server/src/http/personal_access_tokens.rs 
b/core/server/src/http/personal_access_tokens.rs
index 94d89bdb4..a8244d160 100644
--- a/core/server/src/http/personal_access_tokens.rs
+++ b/core/server/src/http/personal_access_tokens.rs
@@ -84,7 +84,7 @@ async fn create_personal_access_token(
     Json(command): Json<CreatePersonalAccessToken>,
 ) -> Result<Json<RawPersonalAccessToken>, CustomError> {
     command.validate()?;
-    let (_personal_access_token, token) = state
+    let (personal_access_token, token) = state
         .shard
         .create_personal_access_token(identity.user_id, &command.name, 
command.expiry)
         .error(|e: &IggyError| {
@@ -94,6 +94,21 @@ async fn create_personal_access_token(
             )
         })?;
 
+    {
+        let broadcast_future = SendWrapper::new(async {
+            use crate::shard::transmission::event::ShardEvent;
+            let event = ShardEvent::CreatedPersonalAccessToken {
+                personal_access_token: personal_access_token.clone(),
+            };
+            let _ = state
+                .shard
+                .shard()
+                .broadcast_event_to_all_shards(event)
+                .await;
+        });
+        broadcast_future.await;
+    }
+
     let token_hash = PersonalAccessToken::hash_token(&token);
     let command = 
EntryCommand::CreatePersonalAccessToken(CreatePersonalAccessTokenWithHash {
         command,
@@ -119,6 +134,7 @@ async fn delete_personal_access_token(
     Extension(identity): Extension<Identity>,
     Path(name): Path<String>,
 ) -> Result<StatusCode, CustomError> {
+    let token_name = name.clone();
     state
         .shard
         .delete_personal_access_token(identity.user_id, &name)
@@ -129,6 +145,22 @@ async fn delete_personal_access_token(
             )
         })?;
 
+    {
+        let broadcast_future = SendWrapper::new(async {
+            use crate::shard::transmission::event::ShardEvent;
+            let event = ShardEvent::DeletedPersonalAccessToken {
+                user_id: identity.user_id,
+                name: token_name,
+            };
+            let _ = state
+                .shard
+                .shard()
+                .broadcast_event_to_all_shards(event)
+                .await;
+        });
+        broadcast_future.await;
+    }
+
     let command = 
EntryCommand::DeletePersonalAccessToken(DeletePersonalAccessToken { name });
     let state_future =
         SendWrapper::new(state.shard.shard().state.apply(identity.user_id, 
&command));

Reply via email to