This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 450e9804707 [fix][txn] PIP-473: v5 transaction timeout applied 1000x 
too long (#25959)
450e9804707 is described below

commit 450e9804707a18f957a013513f9110e067af3ea1
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Jun 8 08:09:52 2026 -0700

    [fix][txn] PIP-473: v5 transaction timeout applied 1000x too long (#25959)
---
 .../apache/pulsar/broker/service/ServerCnx.java    |   6 +-
 .../client/api/v5/V5TransactionTimeoutTest.java    | 129 +++++++++++++++++++++
 .../apache/pulsar/common/protocol/Commands.java    |   8 +-
 pulsar-common/src/main/proto/PulsarApi.proto       |   5 +-
 4 files changed, 141 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 8c2dfe8d781..7dff4260d23 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -3549,8 +3549,10 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                 return;
             }
             final String v5Owner = getPrincipal();
+            // txn_ttl_millis is already in milliseconds (the client sends 
unit.toMillis(...)); the v5
+            // coordinator's newTransaction takes milliseconds too, so pass it 
through unchanged.
             service.pulsar().getTransactionCoordinatorV5()
-                    .newTransaction(tcId, command.getTxnTtlSeconds() * 1000L, 
v5Owner)
+                    .newTransaction(tcId, command.getTxnTtlMillis(), v5Owner)
                     .whenComplete((txnId, e) -> {
                         if (e == null) {
                             commandSender.sendNewTxnResponse(requestId, txnId, 
tcId.getId());
@@ -3566,7 +3568,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         TransactionMetadataStoreService transactionMetadataStoreService =
                 service.pulsar().getTransactionMetadataStoreService();
         final String owner = getPrincipal();
-        transactionMetadataStoreService.newTransaction(tcId, 
command.getTxnTtlSeconds(), owner)
+        transactionMetadataStoreService.newTransaction(tcId, 
command.getTxnTtlMillis(), owner)
             .whenComplete(((txnID, ex) -> {
                 if (ex == null) {
                     log.debug()
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5TransactionTimeoutTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5TransactionTimeoutTest.java
new file mode 100644
index 00000000000..6651900cf3e
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5TransactionTimeoutTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import com.google.common.collect.Sets;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.CustomLog;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.config.TransactionPolicy;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Diagnostic for the v5 transaction timeout sweep: a never-committed 
transaction must be aborted by
+ * the broker's timeout sweep, unpinning the buffer so a later 
non-transactional message becomes
+ * visible.
+ */
+@CustomLog
+public class V5TransactionTimeoutTest extends MockedPulsarServiceBaseTest {
+
+    private final String myNamespace = "pulsar/txn-timeout";
+    private PulsarClient v5Client;
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        ServiceConfiguration config = getDefaultConf();
+        config.setTransactionCoordinatorEnabled(true);
+        config.setTopicLevelPoliciesEnabled(false);
+        
config.setTransactionCoordinatorScalableTopicsTimeoutSweepIntervalSeconds(1);
+        super.internalSetup(config);
+
+        admin.clusters().createCluster("test",
+                
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+        
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
+                new TenantInfoImpl(Sets.newHashSet("appid1"), 
Sets.newHashSet("test")));
+        admin.namespaces().createNamespace(myNamespace, 
Sets.newHashSet("test"));
+        
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
+        
pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
+                
.createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
+                        new PartitionedTopicMetadata(1));
+
+        v5Client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                
.transactionPolicy(TransactionPolicy.builder().timeout(Duration.ofSeconds(3)).build())
+                .build();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        if (v5Client != null) {
+            v5Client.close();
+            v5Client = null;
+        }
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testTimeoutSweepAbortsDanglingTransaction() throws Exception {
+        // Stage 1: confirm this broker actually leads TC partition 0 (the 
sweep only runs there).
+        Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() ->
+                pulsar.getTransactionCoordinatorV5() != null
+                        && 
pulsar.getTransactionCoordinatorV5().isLeaderFor(0));
+        log.info().log("Stage 1 OK: broker leads TC partition 0");
+
+        String topic = "topic://" + myNamespace + "/scalable-" + 
UUID.randomUUID().toString().substring(0, 8);
+        admin.scalableTopics().createScalableTopic(topic, 1);
+
+        @Cleanup
+        Producer<String> producer = 
v5Client.newProducer(Schema.string()).topic(topic).create();
+        Transaction txn = v5Client.newTransaction();
+        for (int i = 0; i < 5; i++) {
+            producer.newMessage().transaction(txn).value("dangling-" + 
i).send();
+        }
+        producer.newMessage().value("sentinel").send();
+
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("timeout-sub")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+        assertNull(consumer.receive(Duration.ofSeconds(1)), "Stage 2: 
everything pinned while txn open");
+        log.info().log("Stage 2 OK: pinned while txn open");
+
+        // Stage 3: the timeout sweep (1s cadence) aborts the dangling txn 
after its 3s timeout.
+        Message<String> msg = null;
+        long deadline = System.currentTimeMillis() + 40_000;
+        while (msg == null && System.currentTimeMillis() < deadline) {
+            msg = consumer.receive(Duration.ofSeconds(2));
+        }
+        assertNotNull(msg, "Stage 3: sentinel must appear once the timeout 
sweep aborts the dangling txn");
+        org.testng.Assert.assertEquals(msg.value(), "sentinel");
+        consumer.acknowledge(msg.id());
+        log.info().log("Stage 3 OK: timeout sweep unpinned the buffer");
+    }
+}
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 80b10f47890..2403e85e31a 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -1435,16 +1435,16 @@ public class Commands {
 
     // ---- transaction related ----
 
-    public static ByteBuf newTxn(long tcId, long requestId, long ttlSeconds) {
-        return newTxn(tcId, requestId, ttlSeconds, false);
+    public static ByteBuf newTxn(long tcId, long requestId, long ttlMillis) {
+        return newTxn(tcId, requestId, ttlMillis, false);
     }
 
-    public static ByteBuf newTxn(long tcId, long requestId, long ttlSeconds, 
boolean scalable) {
+    public static ByteBuf newTxn(long tcId, long requestId, long ttlMillis, 
boolean scalable) {
         BaseCommand cmd = localCmd(Type.NEW_TXN);
         cmd.setNewTxn()
                 .setTcId(tcId)
                 .setRequestId(requestId)
-                .setTxnTtlSeconds(ttlSeconds)
+                .setTxnTtlMillis(ttlMillis)
                 .setScalable(scalable);
         return serializeWithSize(cmd);
     }
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto 
b/pulsar-common/src/main/proto/PulsarApi.proto
index f769512660c..729359894d5 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -1109,7 +1109,10 @@ message CommandTcClientConnectResponse {
 
 message CommandNewTxn {
     required uint64 request_id = 1;
-    optional uint64 txn_ttl_seconds = 2 [default = 0];
+    // Transaction timeout in milliseconds. Despite the field number's legacy 
name history, the value
+    // has always been carried in milliseconds (the client sends 
unit.toMillis(...) and both
+    // coordinators consume it as ms); the field is named accordingly to avoid 
confusion.
+    optional uint64 txn_ttl_millis = 2 [default = 0];
     optional uint64 tc_id = 3 [default = 0];
     // When true, route to the metadata-driven (scalable-topics, PIP-473) 
transaction coordinator
     // instead of the legacy one. Set by v5 clients; absent for v4 clients. 
Lets both coordinators

Reply via email to