This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-java-contrib.git


The following commit(s) were added to refs/heads/main by this push:
     new 892df92  PCIP-3 Pulsar Extended Transaction API Enhancement Proposal 
(#11)
892df92 is described below

commit 892df9236dd65f29f4c774772db7ab9db7fb2e0d
Author: Xiangying Meng <[email protected]>
AuthorDate: Tue Dec 17 23:20:26 2024 +0800

    PCIP-3 Pulsar Extended Transaction API Enhancement Proposal (#11)
    
    ### Motivation
    
    The motivation behind this proposal is to address the issue of duplicate 
message consumption when using Fail-over subscription mode with cumulative Ack 
in Pulsar. Despite the use of transactions, achieving exactly-once semantics 
has been problematic due to the inherent behavior of cumulative Ack. This issue 
is particularly challenging to resolve within the constraints of the Pulsar 
main repository for several reasons:
    
    - **Complexity and Longevity**: Fixing this issue without altering the 
Client API is complex and time-consuming.
    - **Confusing API Usage**: The existing transaction APIs in Pulsar are 
often confusing, with methods like `abort()` and `commit()` appearing 
synchronous but functioning asynchronously. This leads to incorrect usage 
patterns, such as needing to call `abort().get()` and `commit().get()` for 
proper operation.
    - **API Modification Challenges**: Modifying the Client API in the Pulsar 
main repository is difficult due to the uncertainty of whether a new solution 
will be perfect and the lengthy API update cycle.
    
    ### Modifications
    
    The proposed solution involves designing a new transaction API in the 
contributor repository that wraps the original Transaction API. This approach 
offers several benefits:
    
    - **Concise Solution**: By wrapping the original API, the problem can be 
solved in a straightforward and concise manner.
    - **Best Practice Reference**: This wrapping serves as a best practice 
example that does not disrupt existing users' usage while providing a reference 
solution for those encountering similar issues.
    - **Enhanced Usability**: The new API aims to clarify and optimize the 
transaction methods, making them more intuitive and less prone to misuse, thus 
improving the overall usability and clarity of transactional operations in 
Pulsar.
    
    Co-authored-by: xiangying <[email protected]>
---
 pcip/pcip-3.md                                     | 239 ++++++++++++++++++++
 pcip/static/img/pcip-3/cumulative-ack-issue.png    | Bin 0 -> 147332 bytes
 pom.xml                                            |   2 +
 pulsar-transaction-contrib/pom.xml                 |  39 +++-
 .../org/apache/pulsar/txn/api/Transaction.java     | 174 +++++++++++++++
 .../apache/pulsar/txn/api/TransactionFactory.java  |  40 ++++
 .../org/apache/pulsar/txn/api/package-info.java    |  14 ++
 .../apache/pulsar/txn/impl/TransactionImpl.java    | 109 ++++++++++
 .../org/apache/pulsar/txn/impl/package-info.java   |  14 ++
 .../pulsar/txn/SingletonPulsarContainer.java       |  70 ++++++
 .../org/apache/pulsar/txn/TransactionDemo.java     |  82 +++++++
 .../org/apache/pulsar/txn/TransactionImplTest.java | 241 +++++++++++++++++++++
 12 files changed, 1023 insertions(+), 1 deletion(-)

diff --git a/pcip/pcip-3.md b/pcip/pcip-3.md
new file mode 100644
index 0000000..da0466a
--- /dev/null
+++ b/pcip/pcip-3.md
@@ -0,0 +1,239 @@
+# PCIP-3:Pulsar Extended Transaction API Enhancement Proposal
+# Background knowledge
+When users consume messages using the Fail-over subscription mode and confirm 
messages using cumulative Ack, duplicate consumption may occur. In this case, 
even if users use Transaction, they cannot achieve Just-Once.
+As shown in the figure below, in failover mode, the two consumer1 and 
consumer2 started simultaneously frequently undergo two disconnection switches.
+Finally, Consumer1 consumed M1, M2, M4, M5; Consumer2 consumed M1, M2, M3. M1 
and M2 were consumed twice.
+
+![cumulative-ack-issue.png](static/img/pcip-3/cumulative-ack-issue.png)
+
+# Goals
+Solve the problem of cumulative ack consumption duplication by designing a new 
transaction API.
+Why not fix this issue in the Pulsar main repository?
+- The complexity of fixing problems without modifying the Client API is high, 
and the problem-solving cycle is long.
+- There are indeed many confusing usage postures for existing transaction APIs
+  - For example, the abort () and commit () methods may seem synchronous in 
name, but they are actually asynchronous. In actual use, you need to use abort 
().get () and commit ().get ().
+- Modifying the Client API in the Pulsar main repository is a difficult task 
because we cannot determine whether the new solution is necessarily perfect, 
and the time cycle for updating the API is long.
+  The benefit of solving this problem in the contributor repository is:
+- By wrapping the original Transaction API, this problem can be solved in a 
concise way. This wrapping can be seen as a best practice that does not affect 
the use of existing users, while providing a reference solution for users who 
encounter similar problems.
+# High Level Design
+Design a new API to place the context of message sending and consumption 
within Transaction, which not only solves the problem of repeated consumption, 
but also retains sufficient scalability for possible optimization in the future.
+- Solve the problem of repeated consumption - use the function of individual 
ack messagelist to batch ack messages instead of the original cumulative ack.
+- Retained sufficient scalability - sending messages requires using 
Transaction to construct messages, and consumed transaction messages are 
recorded in Transaction. Later, more optimizations can be added using this 
information without changing the interface.
+
+## Public-facing Changes
+### Public API
+The org.apache.pulsar.txn.api.Transaction interface is an optimized and 
extended transaction interface designed to enhance usability and clarity in the 
Pulsar contributors' library. It addresses issues with CumulativeAck and 
transactions not preventing repeated message consumption, and it refines 
ambiguous methods for better clarity. Key features include:
+- Message Recording: Records messages in a transaction without automatic 
acknowledgment.
+- Asynchronous and Synchronous Acknowledgment: Supports both asynchronous and 
synchronous acknowledgment of all received messages for specific consumers or 
across all consumers.
+- Transactional Message Builder: Creates a new transactional message builder 
for a given producer to construct and send messages within a transaction 
context.
+- Committing and Aborting Transactions: Offers both asynchronous and 
synchronous methods to commit or abort transactions, ensuring the effectiveness 
of message sends and acknowledgments.
+- Transaction ID and State Retrieval: Provides methods to retrieve the unique 
transaction ID and its current state to determine the transaction's lifecycle 
phase.
+```java
+/**
+* Interface representing an optimized and extended transaction interface in 
the Pulsar
+* contributors' library.
+*
+* <p>This interface provides enhancements and extensions to the base 
transaction interface in
+* Pulsar. It specifically addresses the issue where using CumulativeAck with 
transactions could not
+* prevent message consumed repeated. Additionally, it clarifies and optimizes 
ambiguous methods for
+* better usability and clarity.
+  */
+  public interface Transaction {
+
+/**
+* Records a message in the transaction.
+*
+* <p>This method is used to include a message in the current transaction. The 
message will not be
+* automatically acknowledged when the transaction is committed. Instead, it 
must be explicitly
+* acknowledged by calling one of the ack methods.
+*
+* @param messageId the ID of the message to record
+* @param consumer the consumer that received the message
+  */
+  void recordMsg(MessageId messageId, Consumer<?> consumer);
+
+/**
+* Asynchronously acknowledges all received messages for a specific consumer in 
the transaction.
+*
+* <p>This method is used to acknowledge all messages that have been recorded 
for the specified
+* consumer in the transaction. The acknowledgment is asynchronous, and the 
future can be used to
+* determine when the operation is complete.
+*
+* @param consumer the consumer that received the messages
+* @return a CompletableFuture that will be completed when the acknowledgment 
is complete
+  */
+  CompletableFuture<Void> ackAllReceivedMsgsAsync(Consumer<?> consumer);
+
+/**
+* Acknowledges all received messages for a specific consumer in the 
transaction.
+*
+* <p>This method is a synchronous version of {@link 
#ackAllReceivedMsgsAsync(Consumer)}. It will
+* block until the acknowledgment is complete.
+*
+* @param consumer the consumer that received the messages
+* @throws ExecutionException if the acknowledgment fails
+* @throws InterruptedException if the thread is interrupted while waiting for 
the acknowledgment
+*     to complete
+*/
+void ackAllReceivedMsgs(Consumer<?> consumer) throws ExecutionException, 
InterruptedException;
+
+/**
+* Acknowledges all received messages in the transaction.
+*
+* <p>This method is a convenience method that acknowledges all messages across 
all consumers. It
+* will block until the acknowledgment is complete.
+*
+* @throws ExecutionException if the acknowledgment fails
+* @throws InterruptedException if the thread is interrupted while waiting for 
the acknowledgment
+*     to complete
+*/
+void ackAllReceivedMsgs() throws ExecutionException, InterruptedException;
+
+/**
+* Asynchronously acknowledges all received messages in the transaction.
+*
+* <p>This method is a convenience method that acknowledges all messages across 
all consumers. The
+* acknowledgment is asynchronous, and the future can be used to determine when 
the operation is
+* complete.
+*
+* @return a CompletableFuture that will be completed when the acknowledgment 
is complete
+  */
+  CompletableFuture<Void> ackAllReceivedMsgsAsync();
+
+/**
+* Creates a new transactional message builder for the given producer.
+*
+* <p>This method returns a {@link TypedMessageBuilder} instance that is bound 
to the specified
+* producer and transaction. The returned message builder can be used to 
construct and send
+* messages within the context of a transaction.
+*
+* @param producer the producer instance used to send messages
+* @param <T> the type of messages produced by the producer
+* @return a TypedMessageBuilder instance for building transactional messages
+  */
+  <T> TypedMessageBuilder<T> newTransactionMessage(Producer<T> producer);
+
+/**
+* Asynchronously commits the transaction.
+*
+* <p>This method is used to commit the transaction, making all sent messages 
and acknowledgments
+* effective. When the transaction is committed, consumers receive the 
transaction messages and
+* the pending-ack state becomes ack state. The commit is asynchronous, and the 
future can be used
+* to determine when the operation is complete.
+*
+* @return a CompletableFuture that will be completed when the commit is 
complete
+  */
+  CompletableFuture<Void> commitAsync();
+
+/**
+* Asynchronously aborts the transaction.
+*
+* <p>This method is used to abort the transaction, discarding all send 
messages and
+* acknowledgments. The abort is asynchronous, and the future can be used to 
determine when the
+* operation is complete.
+*
+* @return a CompletableFuture that will be completed when the abort is complete
+  */
+  CompletableFuture<Void> abortAsync();
+
+/**
+* Commits the transaction.
+*
+* <p>This method is a synchronous version of {@link #commitAsync()}. It will 
block until the
+* commit is complete.
+*
+* @throws ExecutionException if the commit fails
+* @throws InterruptedException if the thread is interrupted while waiting for 
the commit to
+*     complete
+*/
+void commit() throws ExecutionException, InterruptedException;
+
+/**
+* Aborts the transaction.
+*
+* <p>This method is a synchronous version of {@link #abortAsync()}. It will 
block until the abort
+* is complete.
+*
+* @throws ExecutionException if the abort fails
+* @throws InterruptedException if the thread is interrupted while waiting for 
the abort to
+*     complete
+*/
+void abort() throws ExecutionException, InterruptedException;
+
+/**
+* Gets the transaction ID.
+*
+* <p>This method returns the unique identifier for the transaction.
+*
+* @return the transaction ID
+  */
+  TxnID getTxnID();
+
+/**
+* Gets the current state of the transaction.
+*
+* <p>This method returns the current state of the transaction, which can be 
used to determine if
+* the transaction is open, committed, aborted, error or timeout.
+*
+* @return the current state of the transaction
+  */
+  org.apache.pulsar.client.api.transaction.Transaction.State getState();
+  }
+```  
+
+# Get started
+## Quick Start
+```java
+public void transactionDemo() throws Exception {
+String pubTopic = "persistent://public/default/my-pub-topic";
+String subTopic = "persistent://public/default/my-sub-topic";
+String subscription = "my-subscription";
+
+// Create a Pulsar client instance
+PulsarClient client = SingletonPulsarContainer.createPulsarClient();
+
+// Create a Transaction object
+// Use TransactionFactory to create a transaction object with a timeout of 5 
seconds
+Transaction transaction =
+TransactionFactory.createTransaction(client, 5, TimeUnit.SECONDS).get();
+
+// Create producers and a consumer
+// Create two producers to send messages to different topics
+Producer<String> producerToPubTopic = 
client.newProducer(Schema.STRING).topic(pubTopic).create();
+Producer<String> producerToSubTopic = 
client.newProducer(Schema.STRING).topic(subTopic).create();
+
+// Create a consumer to receive messages from the subTopic
+Consumer<String> consumerFromSubTopic = client
+.newConsumer(Schema.STRING)
+.subscriptionName(subscription)
+.topic(subTopic)
+.subscribe();
+
+// Send a message to the Sub Topic
+producerToSubTopic.send("Hello World");
+
+// Receive a message
+Message<String> receivedMessage = consumerFromSubTopic.receive();
+MessageId receivedMessageId = receivedMessage.getMessageId();
+
+// Record the message in the transaction
+transaction.recordMsg(receivedMessageId, consumerFromSubTopic);
+
+// Forward the transaction message to the pub topic
+// Use the transaction message builder to forward the received message to the 
pubTopic
+transaction.newTransactionMessage(producerToSubTopic).value(receivedMessage.getValue()).send();
+
+// Acknowledge all received messages
+// Acknowledge all messages received from the subTopic within the transaction
+transaction.ackAllReceivedMsgs(consumerFromSubTopic);
+
+// Commit the transaction
+// Commit the transaction to ensure all recorded messages and acknowledgments 
take effect
+transaction.commit();
+
+// Close the consumer, producers, and client to release resources
+consumerFromSubTopic.close();
+producerToSubTopic.close();
+client.close();
+}
+```
\ No newline at end of file
diff --git a/pcip/static/img/pcip-3/cumulative-ack-issue.png 
b/pcip/static/img/pcip-3/cumulative-ack-issue.png
new file mode 100644
index 0000000..f64cced
Binary files /dev/null and b/pcip/static/img/pcip-3/cumulative-ack-issue.png 
differ
diff --git a/pom.xml b/pom.xml
index 749f363..e57a6e2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -48,6 +48,8 @@
         <commons-pool.version>2.12.0</commons-pool.version>
         <awaitility.version>4.2.2</awaitility.version>
         <testcontainers.version>1.20.1</testcontainers.version>
+        <junit.version>4.13.1</junit.version>
+        <mockito.version>5.12.0</mockito.version>
     </properties>
 
     <modules>
diff --git a/pulsar-transaction-contrib/pom.xml 
b/pulsar-transaction-contrib/pom.xml
index c9f0077..01b4501 100644
--- a/pulsar-transaction-contrib/pom.xml
+++ b/pulsar-transaction-contrib/pom.xml
@@ -21,8 +21,45 @@
         <artifactId>pulsar-java-contrib</artifactId>
         <version>1.0.0-SNAPSHOT</version>
     </parent>
-    <inceptionYear>2024</inceptionYear>
 
     <artifactId>pulsar-transaction-contrib</artifactId>
 
+    <dependencies>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>${junit.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>pulsar-client-all</artifactId>
+            <version>${pulsar.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>${mockito.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>pulsar</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <source>17</source>
+                    <target>17</target>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+    <inceptionYear>2024</inceptionYear>
+
 </project>
diff --git 
a/pulsar-transaction-contrib/src/main/java/org/apache/pulsar/txn/api/Transaction.java
 
b/pulsar-transaction-contrib/src/main/java/org/apache/pulsar/txn/api/Transaction.java
new file mode 100644
index 0000000..d673032
--- /dev/null
+++ 
b/pulsar-transaction-contrib/src/main/java/org/apache/pulsar/txn/api/Transaction.java
@@ -0,0 +1,174 @@
+package org.apache.pulsar.txn.api;
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.transaction.TxnID;
+
+/**
+ * Interface representing an optimized and extended transaction interface in 
the Pulsar
+ * contributors' library.
+ *
+ * <p>This interface provides enhancements and extensions to the base 
transaction interface in
+ * Pulsar. It specifically addresses the issue where using CumulativeAck with 
transactions could not
+ * prevent message consumed repeated. Additionally, it clarifies and optimizes 
ambiguous methods for
+ * better usability and clarity.
+ */
+public interface Transaction {
+
+  /**
+   * Records a message in the transaction.
+   *
+   * <p>This method is used to include a message in the current transaction. 
The message will not be
+   * automatically acknowledged when the transaction is committed. Instead, it 
must be explicitly
+   * acknowledged by calling one of the ack methods.
+   *
+   * @param messageId the ID of the message to record
+   * @param consumer the consumer that received the message
+   */
+  void recordMsg(MessageId messageId, Consumer<?> consumer);
+
+  /**
+   * Asynchronously acknowledges all received messages for a specific consumer 
in the transaction.
+   *
+   * <p>This method is used to acknowledge all messages that have been 
recorded for the specified
+   * consumer in the transaction. The acknowledgment is asynchronous, and the 
future can be used to
+   * determine when the operation is complete.
+   *
+   * @param consumer the consumer that received the messages
+   * @return a CompletableFuture that will be completed when the 
acknowledgment is complete
+   */
+  CompletableFuture<Void> ackAllReceivedMsgsAsync(Consumer<?> consumer);
+
+  /**
+   * Acknowledges all received messages for a specific consumer in the 
transaction.
+   *
+   * <p>This method is a synchronous version of {@link 
#ackAllReceivedMsgsAsync(Consumer)}. It will
+   * block until the acknowledgment is complete.
+   *
+   * @param consumer the consumer that received the messages
+   * @throws ExecutionException if the acknowledgment fails
+   * @throws InterruptedException if the thread is interrupted while waiting 
for the acknowledgment
+   *     to complete
+   */
+  void ackAllReceivedMsgs(Consumer<?> consumer) throws ExecutionException, 
InterruptedException;
+
+  /**
+   * Acknowledges all received messages in the transaction.
+   *
+   * <p>This method is a convenience method that acknowledges all messages 
across all consumers. It
+   * will block until the acknowledgment is complete.
+   *
+   * @throws ExecutionException if the acknowledgment fails
+   * @throws InterruptedException if the thread is interrupted while waiting 
for the acknowledgment
+   *     to complete
+   */
+  void ackAllReceivedMsgs() throws ExecutionException, InterruptedException;
+
+  /**
+   * Asynchronously acknowledges all received messages in the transaction.
+   *
+   * <p>This method is a convenience method that acknowledges all messages 
across all consumers. The
+   * acknowledgment is asynchronous, and the future can be used to determine 
when the operation is
+   * complete.
+   *
+   * @return a CompletableFuture that will be completed when the 
acknowledgment is complete
+   */
+  CompletableFuture<Void> ackAllReceivedMsgsAsync();
+
+  /**
+   * Creates a new transactional message builder for the given producer.
+   *
+   * <p>This method returns a {@link TypedMessageBuilder} instance that is 
bound to the specified
+   * producer and transaction. The returned message builder can be used to 
construct and send
+   * messages within the context of a transaction.
+   *
+   * @param producer the producer instance used to send messages
+   * @param <T> the type of messages produced by the producer
+   * @return a TypedMessageBuilder instance for building transactional messages
+   */
+  <T> TypedMessageBuilder<T> newTransactionMessage(Producer<T> producer);
+
+  /**
+   * Asynchronously commits the transaction.
+   *
+   * <p>This method is used to commit the transaction, making all sent 
messages and acknowledgments
+   * effective. When the transaction is committed, consumers receive the 
transaction messages and
+   * the pending-ack state becomes ack state. The commit is asynchronous, and 
the future can be used
+   * to determine when the operation is complete.
+   *
+   * @return a CompletableFuture that will be completed when the commit is 
complete
+   */
+  CompletableFuture<Void> commitAsync();
+
+  /**
+   * Asynchronously aborts the transaction.
+   *
+   * <p>This method is used to abort the transaction, discarding all send 
messages and
+   * acknowledgments. The abort is asynchronous, and the future can be used to 
determine when the
+   * operation is complete.
+   *
+   * @return a CompletableFuture that will be completed when the abort is 
complete
+   */
+  CompletableFuture<Void> abortAsync();
+
+  /**
+   * Commits the transaction.
+   *
+   * <p>This method is a synchronous version of {@link #commitAsync()}. It 
will block until the
+   * commit is complete.
+   *
+   * @throws ExecutionException if the commit fails
+   * @throws InterruptedException if the thread is interrupted while waiting 
for the commit to
+   *     complete
+   */
+  void commit() throws ExecutionException, InterruptedException;
+
+  /**
+   * Aborts the transaction.
+   *
+   * <p>This method is a synchronous version of {@link #abortAsync()}. It will 
block until the abort
+   * is complete.
+   *
+   * @throws ExecutionException if the abort fails
+   * @throws InterruptedException if the thread is interrupted while waiting 
for the abort to
+   *     complete
+   */
+  void abort() throws ExecutionException, InterruptedException;
+
+  /**
+   * Gets the transaction ID.
+   *
+   * <p>This method returns the unique identifier for the transaction.
+   *
+   * @return the transaction ID
+   */
+  TxnID getTxnID();
+
+  /**
+   * Gets the current state of the transaction.
+   *
+   * <p>This method returns the current state of the transaction, which can be 
used to determine if
+   * the transaction is open, committed, aborted, error or timeout.
+   *
+   * @return the current state of the transaction
+   */
+  org.apache.pulsar.client.api.transaction.Transaction.State getState();
+}
diff --git 
a/pulsar-transaction-contrib/src/main/java/org/apache/pulsar/txn/api/TransactionFactory.java
 
b/pulsar-transaction-contrib/src/main/java/org/apache/pulsar/txn/api/TransactionFactory.java
new file mode 100644
index 0000000..b540b64
--- /dev/null
+++ 
b/pulsar-transaction-contrib/src/main/java/org/apache/pulsar/txn/api/TransactionFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pulsar.txn.api;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.txn.impl.TransactionImpl;
+
+public class TransactionFactory {
+
+  /**
+   * Creates a new transaction with the specified timeout.
+   *
+   * @param pulsarClient the Pulsar client instance
+   * @param timeout the transaction timeout
+   * @param unit the time unit of the timeout
+   * @return a CompletableFuture that will be completed with the new 
transaction
+   */
+  public static CompletableFuture<Transaction> createTransaction(
+      PulsarClient pulsarClient, long timeout, TimeUnit unit) {
+    // Create a transaction with the specified timeout
+    return pulsarClient
+        .newTransaction()
+        .withTransactionTimeout(timeout, unit)
+        .build()
+        .thenApply(TransactionImpl::new);
+  }
+}
diff --git 
a/pulsar-transaction-contrib/src/main/java/org/apache/pulsar/txn/api/package-info.java
 
b/pulsar-transaction-contrib/src/main/java/org/apache/pulsar/txn/api/package-info.java
new file mode 100644
index 0000000..a1cf663
--- /dev/null
+++ 
b/pulsar-transaction-contrib/src/main/java/org/apache/pulsar/txn/api/package-info.java
@@ -0,0 +1,14 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pulsar.txn.api;
diff --git 
a/pulsar-transaction-contrib/src/main/java/org/apache/pulsar/txn/impl/TransactionImpl.java
 
b/pulsar-transaction-contrib/src/main/java/org/apache/pulsar/txn/impl/TransactionImpl.java
new file mode 100644
index 0000000..94434cd
--- /dev/null
+++ 
b/pulsar-transaction-contrib/src/main/java/org/apache/pulsar/txn/impl/TransactionImpl.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pulsar.txn.impl;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import lombok.Getter;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.txn.api.Transaction;
+
+public class TransactionImpl implements Transaction {
+  @Getter
+  ConcurrentHashMap<Consumer<?>, List<MessageId>> receivedMessages = new 
ConcurrentHashMap<>();
+
+  org.apache.pulsar.client.api.transaction.Transaction transaction;
+
+  public TransactionImpl(org.apache.pulsar.client.api.transaction.Transaction 
transaction) {
+    this.transaction = transaction;
+  }
+
+  @Override
+  public void recordMsg(MessageId messageId, Consumer<?> consumer) {
+    receivedMessages.computeIfAbsent(consumer, k -> new 
CopyOnWriteArrayList<>()).add(messageId);
+  }
+
+  @Override
+  public CompletableFuture<Void> ackAllReceivedMsgsAsync(Consumer<?> consumer) 
{
+    List<MessageId> messageIds = receivedMessages.remove(consumer);
+    if (messageIds != null) {
+      return consumer.acknowledgeAsync(messageIds, transaction);
+    }
+    return CompletableFuture.completedFuture(null);
+  }
+
+  @Override
+  public void ackAllReceivedMsgs(Consumer<?> consumer)
+      throws ExecutionException, InterruptedException {
+    ackAllReceivedMsgsAsync(consumer).get();
+  }
+
+  @Override
+  public void ackAllReceivedMsgs() throws ExecutionException, 
InterruptedException {
+    ackAllReceivedMsgsAsync().get();
+  }
+
+  @Override
+  public CompletableFuture<Void> ackAllReceivedMsgsAsync() {
+    return FutureUtil.waitForAll(
+        receivedMessages.keySet().stream()
+            .map(this::ackAllReceivedMsgsAsync)
+            .collect(Collectors.toList()));
+  }
+
+  @Override
+  public <T> TypedMessageBuilder<T> newTransactionMessage(Producer<T> 
producer) {
+    return producer.newMessage(transaction);
+  }
+
+  @Override
+  public CompletableFuture<Void> commitAsync() {
+    return transaction.commit();
+  }
+
+  @Override
+  public CompletableFuture<Void> abortAsync() {
+    return transaction.abort();
+  }
+
+  @Override
+  public void commit() throws ExecutionException, InterruptedException {
+    transaction.commit().get();
+  }
+
+  @Override
+  public void abort() throws ExecutionException, InterruptedException {
+    transaction.abort().get();
+  }
+
+  @Override
+  public TxnID getTxnID() {
+    return transaction.getTxnID();
+  }
+
+  @Override
+  public org.apache.pulsar.client.api.transaction.Transaction.State getState() 
{
+    return transaction.getState();
+  }
+}
diff --git 
a/pulsar-transaction-contrib/src/main/java/org/apache/pulsar/txn/impl/package-info.java
 
b/pulsar-transaction-contrib/src/main/java/org/apache/pulsar/txn/impl/package-info.java
new file mode 100644
index 0000000..2ec84e4
--- /dev/null
+++ 
b/pulsar-transaction-contrib/src/main/java/org/apache/pulsar/txn/impl/package-info.java
@@ -0,0 +1,14 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pulsar.txn.impl;
diff --git 
a/pulsar-transaction-contrib/src/test/java/org/apache/pulsar/txn/SingletonPulsarContainer.java
 
b/pulsar-transaction-contrib/src/test/java/org/apache/pulsar/txn/SingletonPulsarContainer.java
new file mode 100644
index 0000000..afd3bdc
--- /dev/null
+++ 
b/pulsar-transaction-contrib/src/test/java/org/apache/pulsar/txn/SingletonPulsarContainer.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pulsar.txn;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Properties;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.testcontainers.containers.PulsarContainer;
+import org.testcontainers.utility.DockerImageName;
+
+@Slf4j
+public class SingletonPulsarContainer {
+
+    private static final PulsarContainer PULSAR_CONTAINER;
+
+    static {
+        PULSAR_CONTAINER = new PulsarContainer(getPulsarImage())
+                
.withEnv("PULSAR_PREFIX_acknowledgmentAtBatchIndexLevelEnabled", "true")
+                .withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "true")
+                .withStartupTimeout(Duration.ofMinutes(3));
+        PULSAR_CONTAINER.start();
+    }
+
+    private static DockerImageName getPulsarImage() {
+        return DockerImageName.parse("apachepulsar/pulsar:" + 
getPulsarImageVersion());
+    }
+
+    private static String getPulsarImageVersion() {
+        String pulsarVersion = "";
+        Properties properties = new Properties();
+        try {
+            properties.load(SingletonPulsarContainer.class.getClassLoader()
+                    .getResourceAsStream("pulsar-container.properties"));
+            if (!properties.isEmpty()) {
+                pulsarVersion = properties.getProperty("pulsar.version");
+            }
+        } catch (IOException e) {
+            log.error("Failed to load pulsar version. " + e.getCause());
+        }
+        return pulsarVersion;
+    }
+
+    static PulsarClient createPulsarClient() throws PulsarClientException {
+        return PulsarClient.builder()
+                
.serviceUrl(SingletonPulsarContainer.PULSAR_CONTAINER.getPulsarBrokerUrl())
+                .enableTransaction(true)
+                .build();
+    }
+
+    static PulsarAdmin createPulsarAdmin() throws PulsarClientException {
+        return PulsarAdmin.builder()
+                
.serviceHttpUrl(SingletonPulsarContainer.PULSAR_CONTAINER.getHttpServiceUrl())
+                .build();
+    }
+}
diff --git 
a/pulsar-transaction-contrib/src/test/java/org/apache/pulsar/txn/TransactionDemo.java
 
b/pulsar-transaction-contrib/src/test/java/org/apache/pulsar/txn/TransactionDemo.java
new file mode 100644
index 0000000..308c521
--- /dev/null
+++ 
b/pulsar-transaction-contrib/src/test/java/org/apache/pulsar/txn/TransactionDemo.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pulsar.txn;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.txn.api.Transaction;
+import org.apache.pulsar.txn.api.TransactionFactory;
+import org.testng.annotations.Test;
+
+public class TransactionDemo {
+
+  @Test
+  public void transactionDemo() throws Exception {
+    String pubTopic = "persistent://public/default/my-pub-topic";
+    String subTopic = "persistent://public/default/my-sub-topic";
+    String subscription = "my-subscription";
+
+    // Create a Pulsar client instance
+    PulsarClient client = SingletonPulsarContainer.createPulsarClient();
+
+    // Create a Transaction object
+    // Use TransactionFactory to create a transaction object with a timeout of 
5 seconds
+    Transaction transaction =
+            TransactionFactory.createTransaction(client, 5, 
TimeUnit.SECONDS).get();
+
+    // Create producers and a consumer
+    // Create two producers to send messages to different topics
+    Producer<String> producerToPubTopic = 
client.newProducer(Schema.STRING).topic(pubTopic).create();
+    Producer<String> producerToSubTopic = 
client.newProducer(Schema.STRING).topic(subTopic).create();
+
+    // Create a consumer to receive messages from the subTopic
+    Consumer<String> consumerFromSubTopic = client
+            .newConsumer(Schema.STRING)
+            .subscriptionName(subscription)
+            .topic(subTopic)
+            .subscribe();
+
+    // Send a message to the Sub Topic
+    producerToSubTopic.send("Hello World");
+
+    // Receive a message
+    Message<String> receivedMessage = consumerFromSubTopic.receive();
+    MessageId receivedMessageId = receivedMessage.getMessageId();
+
+    // Record the message in the transaction
+    transaction.recordMsg(receivedMessageId, consumerFromSubTopic);
+
+    // Forward the transaction message to the pub topic
+    // Use the transaction message builder to forward the received message to 
the pubTopic
+    
transaction.newTransactionMessage(producerToSubTopic).value(receivedMessage.getValue()).send();
+
+    // Acknowledge all received messages
+    // Acknowledge all messages received from the subTopic within the 
transaction
+    transaction.ackAllReceivedMsgs(consumerFromSubTopic);
+
+    // Commit the transaction
+    // Commit the transaction to ensure all recorded messages and 
acknowledgments take effect
+    transaction.commit();
+
+    // Close the consumer, producers, and client to release resources
+    consumerFromSubTopic.close();
+    producerToSubTopic.close();
+    client.close();
+  }
+}
\ No newline at end of file
diff --git 
a/pulsar-transaction-contrib/src/test/java/org/apache/pulsar/txn/TransactionImplTest.java
 
b/pulsar-transaction-contrib/src/test/java/org/apache/pulsar/txn/TransactionImplTest.java
new file mode 100644
index 0000000..e4171a6
--- /dev/null
+++ 
b/pulsar-transaction-contrib/src/test/java/org/apache/pulsar/txn/TransactionImplTest.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pulsar.txn;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.txn.impl.TransactionImpl;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TransactionImplTest {
+
+  private TransactionImpl transactionImpl;
+  private org.apache.pulsar.client.api.transaction.Transaction mockTransaction;
+  private List<Consumer<?>> mockConsumers;
+  private List<MessageId> messageIds;
+
+  @BeforeMethod
+  public void setUp() {
+    mockTransaction = 
mock(org.apache.pulsar.client.api.transaction.Transaction.class);
+    mockConsumers = new ArrayList<>();
+    messageIds = new ArrayList<>();
+
+    // Create two mock consumers and two message IDs
+    for (int i = 0; i < 2; i++) {
+      Consumer<?> mockConsumer = mock(Consumer.class);
+      MessageId messageId = mock(MessageId.class);
+      mockConsumers.add(mockConsumer);
+      messageIds.add(messageId);
+    }
+
+    transactionImpl = new TransactionImpl(mockTransaction);
+  }
+
+  @Test
+  public void testRecordMsg() {
+    // Record a message for a consumer
+    Consumer<?> consumer = mockConsumers.get(0);
+    MessageId messageId = messageIds.get(0);
+    transactionImpl.recordMsg(messageId, consumer);
+
+    // Verify the message is recorded for the consumer
+    
assertTrue(transactionImpl.getReceivedMessages().get(consumer).contains(messageId));
+  }
+
+  @Test
+  public void testAckAllReceivedMsgsAsync() throws ExecutionException, 
InterruptedException {
+    // Record messages for different consumers
+    for (int i = 0; i < mockConsumers.size(); i++) {
+      Consumer<?> consumer = mockConsumers.get(i);
+      MessageId messageId = messageIds.get(i);
+      transactionImpl.recordMsg(messageId, consumer);
+    }
+
+    // Mock the acknowledgeAsync method for each consumer
+    for (Consumer<?> consumer : mockConsumers) {
+      when(consumer.acknowledgeAsync(anyList(), any()))
+          .thenReturn(CompletableFuture.completedFuture(null));
+    }
+
+    // Call the ackAllReceivedMsgsAsync method
+    CompletableFuture<Void> future = transactionImpl.ackAllReceivedMsgsAsync();
+    future.get();
+
+    // Verify each consumer called the correct acknowledgeAsync method with 
the correct message IDs
+    for (int i = 0; i < mockConsumers.size(); i++) {
+      Consumer<?> consumer = mockConsumers.get(i);
+      MessageId messageId = messageIds.get(i);
+      verify(consumer).acknowledgeAsync(eq(List.of(messageId)), 
eq(mockTransaction));
+    }
+  }
+
+  @Test
+  public void testAckAllReceivedMsgs() throws ExecutionException, 
InterruptedException {
+    // Record messages for a consumer
+    Consumer<?> consumer = mockConsumers.get(0);
+    MessageId messageId = messageIds.get(0);
+    transactionImpl.recordMsg(messageId, consumer);
+
+    // Mock the acknowledgeAsync method for the consumer
+    when(consumer.acknowledgeAsync(anyList(), any()))
+        .thenReturn(CompletableFuture.completedFuture(null));
+
+    // Call the ackAllReceivedMsgs method
+    transactionImpl.ackAllReceivedMsgs(consumer);
+
+    // Verify the consumer called the correct acknowledgeAsync method with the 
correct message IDs
+    verify(consumer).acknowledgeAsync(eq(List.of(messageId)), 
eq(mockTransaction));
+    // Verify the message Ids were removed from the transaction context after 
acked.
+    assertEquals(transactionImpl.getReceivedMessages().size(), 0);
+  }
+
+  @Test
+  public void testAckAllReceivedMsgsAll() throws ExecutionException, 
InterruptedException {
+    // Record messages for different consumers
+    for (int i = 0; i < mockConsumers.size(); i++) {
+      Consumer<?> consumer = mockConsumers.get(i);
+      MessageId messageId = messageIds.get(i);
+      transactionImpl.recordMsg(messageId, consumer);
+    }
+
+    // Mock the acknowledgeAsync method for each consumer
+    for (Consumer<?> consumer : mockConsumers) {
+      when(consumer.acknowledgeAsync(anyList(), any()))
+          .thenReturn(CompletableFuture.completedFuture(null));
+    }
+
+    // Call the ackAllReceivedMsgs method
+    transactionImpl.ackAllReceivedMsgs();
+
+    // Verify each consumer called the correct acknowledgeAsync method with 
the correct message IDs
+    for (int i = 0; i < mockConsumers.size(); i++) {
+      Consumer<?> consumer = mockConsumers.get(i);
+      MessageId messageId = messageIds.get(i);
+      verify(consumer).acknowledgeAsync(eq(List.of(messageId)), 
eq(mockTransaction));
+    }
+  }
+
+  @Test
+  public void testAckAllReceivedMsgsAsyncAll() throws ExecutionException, 
InterruptedException {
+    // Record messages for different consumers
+    for (int i = 0; i < mockConsumers.size(); i++) {
+      Consumer<?> consumer = mockConsumers.get(i);
+      MessageId messageId = messageIds.get(i);
+      transactionImpl.recordMsg(messageId, consumer);
+    }
+
+    // Mock the acknowledgeAsync method for each consumer
+    for (Consumer<?> consumer : mockConsumers) {
+      when(consumer.acknowledgeAsync(anyList(), any()))
+          .thenReturn(CompletableFuture.completedFuture(null));
+    }
+
+    // Call the ackAllReceivedMsgsAsync method
+    CompletableFuture<Void> future = transactionImpl.ackAllReceivedMsgsAsync();
+    future.get();
+
+    // Verify each consumer called the correct acknowledgeAsync method with 
the correct message IDs
+    for (int i = 0; i < mockConsumers.size(); i++) {
+      Consumer<?> consumer = mockConsumers.get(i);
+      MessageId messageId = messageIds.get(i);
+      verify(consumer).acknowledgeAsync(eq(List.of(messageId)), 
eq(mockTransaction));
+    }
+  }
+
+  @Test
+  public void testCommitAsync() throws ExecutionException, 
InterruptedException {
+    // Mock the commit method of the transaction
+    
when(mockTransaction.commit()).thenReturn(CompletableFuture.completedFuture(null));
+
+    // Call the commitAsync method
+    CompletableFuture<Void> future = transactionImpl.commitAsync();
+    future.get();
+
+    // Verify the commit method was called
+    verify(mockTransaction).commit();
+  }
+
+  @Test
+  public void testAbortAsync() throws ExecutionException, InterruptedException 
{
+    // Mock the abort method of the transaction
+    
when(mockTransaction.abort()).thenReturn(CompletableFuture.completedFuture(null));
+
+    // Call the abortAsync method
+    CompletableFuture<Void> future = transactionImpl.abortAsync();
+    future.get();
+
+    // Verify the abort method was called
+    verify(mockTransaction).abort();
+  }
+
+  @Test
+  public void testCommit() throws ExecutionException, InterruptedException {
+    // Mock the commit method of the transaction
+    
when(mockTransaction.commit()).thenReturn(CompletableFuture.completedFuture(null));
+
+    // Call the commit method
+    transactionImpl.commit();
+
+    // Verify the commit method was called
+    verify(mockTransaction).commit();
+  }
+
+  @Test
+  public void testAbort() throws ExecutionException, InterruptedException {
+    // Mock the abort method of the transaction
+    
when(mockTransaction.abort()).thenReturn(CompletableFuture.completedFuture(null));
+
+    // Call the abort method
+    transactionImpl.abort();
+
+    // Verify the abort method was called
+    verify(mockTransaction).abort();
+  }
+
+  @Test
+  public void testGetTxnID() {
+    // Mock the getTxnID method of the transaction
+    TxnID txnID = mock(TxnID.class);
+    when(mockTransaction.getTxnID()).thenReturn(txnID);
+
+    // Call the getTxnID method
+    assertEquals(txnID, transactionImpl.getTxnID());
+  }
+
+  @Test
+  public void testGetState() {
+    // Mock the getState method of the transaction
+    org.apache.pulsar.client.api.transaction.Transaction.State state =
+        org.apache.pulsar.client.api.transaction.Transaction.State.OPEN;
+    when(mockTransaction.getState()).thenReturn(state);
+
+    // Call the getState method
+    assertEquals(state, transactionImpl.getState());
+  }
+}


Reply via email to