This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-java-contrib.git
The following commit(s) were added to refs/heads/main by this push:
new 892df92 PCIP-3 Pulsar Extended Transaction API Enhancement Proposal
(#11)
892df92 is described below
commit 892df9236dd65f29f4c774772db7ab9db7fb2e0d
Author: Xiangying Meng <[email protected]>
AuthorDate: Tue Dec 17 23:20:26 2024 +0800
PCIP-3 Pulsar Extended Transaction API Enhancement Proposal (#11)
### Motivation
The motivation behind this proposal is to address the issue of duplicate
message consumption when using Fail-over subscription mode with cumulative Ack
in Pulsar. Despite the use of transactions, achieving exactly-once semantics
has been problematic due to the inherent behavior of cumulative Ack. This issue
is particularly challenging to resolve within the constraints of the Pulsar
main repository for several reasons:
- **Complexity and Longevity**: Fixing this issue without altering the
Client API is complex and time-consuming.
- **Confusing API Usage**: The existing transaction APIs in Pulsar are
often confusing, with methods like `abort()` and `commit()` appearing
synchronous but functioning asynchronously. This leads to incorrect usage
patterns, such as needing to call `abort().get()` and `commit().get()` for
proper operation.
- **API Modification Challenges**: Modifying the Client API in the Pulsar
main repository is difficult due to the uncertainty of whether a new solution
will be perfect and the lengthy API update cycle.
### Modifications
The proposed solution involves designing a new transaction API in the
contributor repository that wraps the original Transaction API. This approach
offers several benefits:
- **Concise Solution**: By wrapping the original API, the problem can be
solved in a straightforward and concise manner.
- **Best Practice Reference**: This wrapping serves as a best practice
example that does not disrupt existing users' usage while providing a reference
solution for those encountering similar issues.
- **Enhanced Usability**: The new API aims to clarify and optimize the
transaction methods, making them more intuitive and less prone to misuse, thus
improving the overall usability and clarity of transactional operations in
Pulsar.
Co-authored-by: xiangying <[email protected]>
---
pcip/pcip-3.md | 239 ++++++++++++++++++++
pcip/static/img/pcip-3/cumulative-ack-issue.png | Bin 0 -> 147332 bytes
pom.xml | 2 +
pulsar-transaction-contrib/pom.xml | 39 +++-
.../org/apache/pulsar/txn/api/Transaction.java | 174 +++++++++++++++
.../apache/pulsar/txn/api/TransactionFactory.java | 40 ++++
.../org/apache/pulsar/txn/api/package-info.java | 14 ++
.../apache/pulsar/txn/impl/TransactionImpl.java | 109 ++++++++++
.../org/apache/pulsar/txn/impl/package-info.java | 14 ++
.../pulsar/txn/SingletonPulsarContainer.java | 70 ++++++
.../org/apache/pulsar/txn/TransactionDemo.java | 82 +++++++
.../org/apache/pulsar/txn/TransactionImplTest.java | 241 +++++++++++++++++++++
12 files changed, 1023 insertions(+), 1 deletion(-)
diff --git a/pcip/pcip-3.md b/pcip/pcip-3.md
new file mode 100644
index 0000000..da0466a
--- /dev/null
+++ b/pcip/pcip-3.md
@@ -0,0 +1,239 @@
+# PCIP-3:Pulsar Extended Transaction API Enhancement Proposal
+# Background knowledge
+When users consume messages using the Fail-over subscription mode and confirm
messages using cumulative Ack, duplicate consumption may occur. In this case,
even if users use Transaction, they cannot achieve Just-Once.
+As shown in the figure below, in failover mode, the two consumer1 and
consumer2 started simultaneously frequently undergo two disconnection switches.
+Finally, Consumer1 consumed M1, M2, M4, M5; Consumer2 consumed M1, M2, M3. M1
and M2 were consumed twice.
+
+
+
+# Goals
+Solve the problem of cumulative ack consumption duplication by designing a new
transaction API.
+Why not fix this issue in the Pulsar main repository?
+- The complexity of fixing problems without modifying the Client API is high,
and the problem-solving cycle is long.
+- There are indeed many confusing usage postures for existing transaction APIs
+ - For example, the abort () and commit () methods may seem synchronous in
name, but they are actually asynchronous. In actual use, you need to use abort
().get () and commit ().get ().
+- Modifying the Client API in the Pulsar main repository is a difficult task
because we cannot determine whether the new solution is necessarily perfect,
and the time cycle for updating the API is long.
+ The benefit of solving this problem in the contributor repository is:
+- By wrapping the original Transaction API, this problem can be solved in a
concise way. This wrapping can be seen as a best practice that does not affect
the use of existing users, while providing a reference solution for users who
encounter similar problems.
+# High Level Design
+Design a new API to place the context of message sending and consumption
within Transaction, which not only solves the problem of repeated consumption,
but also retains sufficient scalability for possible optimization in the future.
+- Solve the problem of repeated consumption - use the function of individual
ack messagelist to batch ack messages instead of the original cumulative ack.
+- Retained sufficient scalability - sending messages requires using
Transaction to construct messages, and consumed transaction messages are
recorded in Transaction. Later, more optimizations can be added using this
information without changing the interface.
+
+## Public-facing Changes
+### Public API
+The org.apache.pulsar.txn.api.Transaction interface is an optimized and
extended transaction interface designed to enhance usability and clarity in the
Pulsar contributors' library. It addresses issues with CumulativeAck and
transactions not preventing repeated message consumption, and it refines
ambiguous methods for better clarity. Key features include:
+- Message Recording: Records messages in a transaction without automatic
acknowledgment.
+- Asynchronous and Synchronous Acknowledgment: Supports both asynchronous and
synchronous acknowledgment of all received messages for specific consumers or
across all consumers.
+- Transactional Message Builder: Creates a new transactional message builder
for a given producer to construct and send messages within a transaction
context.
+- Committing and Aborting Transactions: Offers both asynchronous and
synchronous methods to commit or abort transactions, ensuring the effectiveness
of message sends and acknowledgments.
+- Transaction ID and State Retrieval: Provides methods to retrieve the unique
transaction ID and its current state to determine the transaction's lifecycle
phase.
+```java
+/**
+* Interface representing an optimized and extended transaction interface in
the Pulsar
+* contributors' library.
+*
+* <p>This interface provides enhancements and extensions to the base
transaction interface in
+* Pulsar. It specifically addresses the issue where using CumulativeAck with
transactions could not
+* prevent message consumed repeated. Additionally, it clarifies and optimizes
ambiguous methods for
+* better usability and clarity.
+ */
+ public interface Transaction {
+
+/**
+* Records a message in the transaction.
+*
+* <p>This method is used to include a message in the current transaction. The
message will not be
+* automatically acknowledged when the transaction is committed. Instead, it
must be explicitly
+* acknowledged by calling one of the ack methods.
+*
+* @param messageId the ID of the message to record
+* @param consumer the consumer that received the message
+ */
+ void recordMsg(MessageId messageId, Consumer<?> consumer);
+
+/**
+* Asynchronously acknowledges all received messages for a specific consumer in
the transaction.
+*
+* <p>This method is used to acknowledge all messages that have been recorded
for the specified
+* consumer in the transaction. The acknowledgment is asynchronous, and the
future can be used to
+* determine when the operation is complete.
+*
+* @param consumer the consumer that received the messages
+* @return a CompletableFuture that will be completed when the acknowledgment
is complete
+ */
+ CompletableFuture<Void> ackAllReceivedMsgsAsync(Consumer<?> consumer);
+
+/**
+* Acknowledges all received messages for a specific consumer in the
transaction.
+*
+* <p>This method is a synchronous version of {@link
#ackAllReceivedMsgsAsync(Consumer)}. It will
+* block until the acknowledgment is complete.
+*
+* @param consumer the consumer that received the messages
+* @throws ExecutionException if the acknowledgment fails
+* @throws InterruptedException if the thread is interrupted while waiting for
the acknowledgment
+* to complete
+*/
+void ackAllReceivedMsgs(Consumer<?> consumer) throws ExecutionException,
InterruptedException;
+
+/**
+* Acknowledges all received messages in the transaction.
+*
+* <p>This method is a convenience method that acknowledges all messages across
all consumers. It
+* will block until the acknowledgment is complete.
+*
+* @throws ExecutionException if the acknowledgment fails
+* @throws InterruptedException if the thread is interrupted while waiting for
the acknowledgment
+* to complete
+*/
+void ackAllReceivedMsgs() throws ExecutionException, InterruptedException;
+
+/**
+* Asynchronously acknowledges all received messages in the transaction.
+*
+* <p>This method is a convenience method that acknowledges all messages across
all consumers. The
+* acknowledgment is asynchronous, and the future can be used to determine when
the operation is
+* complete.
+*
+* @return a CompletableFuture that will be completed when the acknowledgment
is complete
+ */
+ CompletableFuture<Void> ackAllReceivedMsgsAsync();
+
+/**
+* Creates a new transactional message builder for the given producer.
+*
+* <p>This method returns a {@link TypedMessageBuilder} instance that is bound
to the specified
+* producer and transaction. The returned message builder can be used to
construct and send
+* messages within the context of a transaction.
+*
+* @param producer the producer instance used to send messages
+* @param <T> the type of messages produced by the producer
+* @return a TypedMessageBuilder instance for building transactional messages
+ */
+ <T> TypedMessageBuilder<T> newTransactionMessage(Producer<T> producer);
+
+/**
+* Asynchronously commits the transaction.
+*
+* <p>This method is used to commit the transaction, making all sent messages
and acknowledgments
+* effective. When the transaction is committed, consumers receive the
transaction messages and
+* the pending-ack state becomes ack state. The commit is asynchronous, and the
future can be used
+* to determine when the operation is complete.
+*
+* @return a CompletableFuture that will be completed when the commit is
complete
+ */
+ CompletableFuture<Void> commitAsync();
+
+/**
+* Asynchronously aborts the transaction.
+*
+* <p>This method is used to abort the transaction, discarding all send
messages and
+* acknowledgments. The abort is asynchronous, and the future can be used to
determine when the
+* operation is complete.
+*
+* @return a CompletableFuture that will be completed when the abort is complete
+ */
+ CompletableFuture<Void> abortAsync();
+
+/**
+* Commits the transaction.
+*
+* <p>This method is a synchronous version of {@link #commitAsync()}. It will
block until the
+* commit is complete.
+*
+* @throws ExecutionException if the commit fails
+* @throws InterruptedException if the thread is interrupted while waiting for
the commit to
+* complete
+*/
+void commit() throws ExecutionException, InterruptedException;
+
+/**
+* Aborts the transaction.
+*
+* <p>This method is a synchronous version of {@link #abortAsync()}. It will
block until the abort
+* is complete.
+*
+* @throws ExecutionException if the abort fails
+* @throws InterruptedException if the thread is interrupted while waiting for
the abort to
+* complete
+*/
+void abort() throws ExecutionException, InterruptedException;
+
+/**
+* Gets the transaction ID.
+*
+* <p>This method returns the unique identifier for the transaction.
+*
+* @return the transaction ID
+ */
+ TxnID getTxnID();
+
+/**
+* Gets the current state of the transaction.
+*
+* <p>This method returns the current state of the transaction, which can be
used to determine if
+* the transaction is open, committed, aborted, error or timeout.
+*
+* @return the current state of the transaction
+ */
+ org.apache.pulsar.client.api.transaction.Transaction.State getState();
+ }
+```
+
+# Get started
+## Quick Start
+```java
+public void transactionDemo() throws Exception {
+String pubTopic = "persistent://public/default/my-pub-topic";
+String subTopic = "persistent://public/default/my-sub-topic";
+String subscription = "my-subscription";
+
+// Create a Pulsar client instance
+PulsarClient client = SingletonPulsarContainer.createPulsarClient();
+
+// Create a Transaction object
+// Use TransactionFactory to create a transaction object with a timeout of 5
seconds
+Transaction transaction =
+TransactionFactory.createTransaction(client, 5, TimeUnit.SECONDS).get();
+
+// Create producers and a consumer
+// Create two producers to send messages to different topics
+Producer<String> producerToPubTopic =
client.newProducer(Schema.STRING).topic(pubTopic).create();
+Producer<String> producerToSubTopic =
client.newProducer(Schema.STRING).topic(subTopic).create();
+
+// Create a consumer to receive messages from the subTopic
+Consumer<String> consumerFromSubTopic = client
+.newConsumer(Schema.STRING)
+.subscriptionName(subscription)
+.topic(subTopic)
+.subscribe();
+
+// Send a message to the Sub Topic
+producerToSubTopic.send("Hello World");
+
+// Receive a message
+Message<String> receivedMessage = consumerFromSubTopic.receive();
+MessageId receivedMessageId = receivedMessage.getMessageId();
+
+// Record the message in the transaction
+transaction.recordMsg(receivedMessageId, consumerFromSubTopic);
+
+// Forward the transaction message to the pub topic
+// Use the transaction message builder to forward the received message to the
pubTopic
+transaction.newTransactionMessage(producerToSubTopic).value(receivedMessage.getValue()).send();
+
+// Acknowledge all received messages
+// Acknowledge all messages received from the subTopic within the transaction
+transaction.ackAllReceivedMsgs(consumerFromSubTopic);
+
+// Commit the transaction
+// Commit the transaction to ensure all recorded messages and acknowledgments
take effect
+transaction.commit();
+
+// Close the consumer, producers, and client to release resources
+consumerFromSubTopic.close();
+producerToSubTopic.close();
+client.close();
+}
+```
\ No newline at end of file
diff --git a/pcip/static/img/pcip-3/cumulative-ack-issue.png
b/pcip/static/img/pcip-3/cumulative-ack-issue.png
new file mode 100644
index 0000000..f64cced
Binary files /dev/null and b/pcip/static/img/pcip-3/cumulative-ack-issue.png
differ
diff --git a/pom.xml b/pom.xml
index 749f363..e57a6e2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -48,6 +48,8 @@
<commons-pool.version>2.12.0</commons-pool.version>
<awaitility.version>4.2.2</awaitility.version>
<testcontainers.version>1.20.1</testcontainers.version>
+ <junit.version>4.13.1</junit.version>
+ <mockito.version>5.12.0</mockito.version>
</properties>
<modules>
diff --git a/pulsar-transaction-contrib/pom.xml
b/pulsar-transaction-contrib/pom.xml
index c9f0077..01b4501 100644
--- a/pulsar-transaction-contrib/pom.xml
+++ b/pulsar-transaction-contrib/pom.xml
@@ -21,8 +21,45 @@
<artifactId>pulsar-java-contrib</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
- <inceptionYear>2024</inceptionYear>
<artifactId>pulsar-transaction-contrib</artifactId>
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-client-all</artifactId>
+ <version>${pulsar.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>${mockito.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>pulsar</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>17</source>
+ <target>17</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ <inceptionYear>2024</inceptionYear>
+
</project>
diff --git
a/pulsar-transaction-contrib/src/main/java/org/apache/pulsar/txn/api/Transaction.java
b/pulsar-transaction-contrib/src/main/java/org/apache/pulsar/txn/api/Transaction.java
new file mode 100644
index 0000000..d673032
--- /dev/null
+++
b/pulsar-transaction-contrib/src/main/java/org/apache/pulsar/txn/api/Transaction.java
@@ -0,0 +1,174 @@
+package org.apache.pulsar.txn.api;
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.transaction.TxnID;
+
+/**
+ * Interface representing an optimized and extended transaction interface in
the Pulsar
+ * contributors' library.
+ *
+ * <p>This interface provides enhancements and extensions to the base
transaction interface in
+ * Pulsar. It specifically addresses the issue where using CumulativeAck with
transactions could not
+ * prevent message consumed repeated. Additionally, it clarifies and optimizes
ambiguous methods for
+ * better usability and clarity.
+ */
+public interface Transaction {
+
+ /**
+ * Records a message in the transaction.
+ *
+ * <p>This method is used to include a message in the current transaction.
The message will not be
+ * automatically acknowledged when the transaction is committed. Instead, it
must be explicitly
+ * acknowledged by calling one of the ack methods.
+ *
+ * @param messageId the ID of the message to record
+ * @param consumer the consumer that received the message
+ */
+ void recordMsg(MessageId messageId, Consumer<?> consumer);
+
+ /**
+ * Asynchronously acknowledges all received messages for a specific consumer
in the transaction.
+ *
+ * <p>This method is used to acknowledge all messages that have been
recorded for the specified
+ * consumer in the transaction. The acknowledgment is asynchronous, and the
future can be used to
+ * determine when the operation is complete.
+ *
+ * @param consumer the consumer that received the messages
+ * @return a CompletableFuture that will be completed when the
acknowledgment is complete
+ */
+ CompletableFuture<Void> ackAllReceivedMsgsAsync(Consumer<?> consumer);
+
+ /**
+ * Acknowledges all received messages for a specific consumer in the
transaction.
+ *
+ * <p>This method is a synchronous version of {@link
#ackAllReceivedMsgsAsync(Consumer)}. It will
+ * block until the acknowledgment is complete.
+ *
+ * @param consumer the consumer that received the messages
+ * @throws ExecutionException if the acknowledgment fails
+ * @throws InterruptedException if the thread is interrupted while waiting
for the acknowledgment
+ * to complete
+ */
+ void ackAllReceivedMsgs(Consumer<?> consumer) throws ExecutionException,
InterruptedException;
+
+ /**
+ * Acknowledges all received messages in the transaction.
+ *
+ * <p>This method is a convenience method that acknowledges all messages
across all consumers. It
+ * will block until the acknowledgment is complete.
+ *
+ * @throws ExecutionException if the acknowledgment fails
+ * @throws InterruptedException if the thread is interrupted while waiting
for the acknowledgment
+ * to complete
+ */
+ void ackAllReceivedMsgs() throws ExecutionException, InterruptedException;
+
+ /**
+ * Asynchronously acknowledges all received messages in the transaction.
+ *
+ * <p>This method is a convenience method that acknowledges all messages
across all consumers. The
+ * acknowledgment is asynchronous, and the future can be used to determine
when the operation is
+ * complete.
+ *
+ * @return a CompletableFuture that will be completed when the
acknowledgment is complete
+ */
+ CompletableFuture<Void> ackAllReceivedMsgsAsync();
+
+ /**
+ * Creates a new transactional message builder for the given producer.
+ *
+ * <p>This method returns a {@link TypedMessageBuilder} instance that is
bound to the specified
+ * producer and transaction. The returned message builder can be used to
construct and send
+ * messages within the context of a transaction.
+ *
+ * @param producer the producer instance used to send messages
+ * @param <T> the type of messages produced by the producer
+ * @return a TypedMessageBuilder instance for building transactional messages
+ */
+ <T> TypedMessageBuilder<T> newTransactionMessage(Producer<T> producer);
+
+ /**
+ * Asynchronously commits the transaction.
+ *
+ * <p>This method is used to commit the transaction, making all sent
messages and acknowledgments
+ * effective. When the transaction is committed, consumers receive the
transaction messages and
+ * the pending-ack state becomes ack state. The commit is asynchronous, and
the future can be used
+ * to determine when the operation is complete.
+ *
+ * @return a CompletableFuture that will be completed when the commit is
complete
+ */
+ CompletableFuture<Void> commitAsync();
+
+ /**
+ * Asynchronously aborts the transaction.
+ *
+ * <p>This method is used to abort the transaction, discarding all send
messages and
+ * acknowledgments. The abort is asynchronous, and the future can be used to
determine when the
+ * operation is complete.
+ *
+ * @return a CompletableFuture that will be completed when the abort is
complete
+ */
+ CompletableFuture<Void> abortAsync();
+
+ /**
+ * Commits the transaction.
+ *
+ * <p>This method is a synchronous version of {@link #commitAsync()}. It
will block until the
+ * commit is complete.
+ *
+ * @throws ExecutionException if the commit fails
+ * @throws InterruptedException if the thread is interrupted while waiting
for the commit to
+ * complete
+ */
+ void commit() throws ExecutionException, InterruptedException;
+
+ /**
+ * Aborts the transaction.
+ *
+ * <p>This method is a synchronous version of {@link #abortAsync()}. It will
block until the abort
+ * is complete.
+ *
+ * @throws ExecutionException if the abort fails
+ * @throws InterruptedException if the thread is interrupted while waiting
for the abort to
+ * complete
+ */
+ void abort() throws ExecutionException, InterruptedException;
+
+ /**
+ * Gets the transaction ID.
+ *
+ * <p>This method returns the unique identifier for the transaction.
+ *
+ * @return the transaction ID
+ */
+ TxnID getTxnID();
+
+ /**
+ * Gets the current state of the transaction.
+ *
+ * <p>This method returns the current state of the transaction, which can be
used to determine if
+ * the transaction is open, committed, aborted, error or timeout.
+ *
+ * @return the current state of the transaction
+ */
+ org.apache.pulsar.client.api.transaction.Transaction.State getState();
+}
diff --git
a/pulsar-transaction-contrib/src/main/java/org/apache/pulsar/txn/api/TransactionFactory.java
b/pulsar-transaction-contrib/src/main/java/org/apache/pulsar/txn/api/TransactionFactory.java
new file mode 100644
index 0000000..b540b64
--- /dev/null
+++
b/pulsar-transaction-contrib/src/main/java/org/apache/pulsar/txn/api/TransactionFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pulsar.txn.api;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.txn.impl.TransactionImpl;
+
+public class TransactionFactory {
+
+ /**
+ * Creates a new transaction with the specified timeout.
+ *
+ * @param pulsarClient the Pulsar client instance
+ * @param timeout the transaction timeout
+ * @param unit the time unit of the timeout
+ * @return a CompletableFuture that will be completed with the new
transaction
+ */
+ public static CompletableFuture<Transaction> createTransaction(
+ PulsarClient pulsarClient, long timeout, TimeUnit unit) {
+ // Create a transaction with the specified timeout
+ return pulsarClient
+ .newTransaction()
+ .withTransactionTimeout(timeout, unit)
+ .build()
+ .thenApply(TransactionImpl::new);
+ }
+}
diff --git
a/pulsar-transaction-contrib/src/main/java/org/apache/pulsar/txn/api/package-info.java
b/pulsar-transaction-contrib/src/main/java/org/apache/pulsar/txn/api/package-info.java
new file mode 100644
index 0000000..a1cf663
--- /dev/null
+++
b/pulsar-transaction-contrib/src/main/java/org/apache/pulsar/txn/api/package-info.java
@@ -0,0 +1,14 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pulsar.txn.api;
diff --git
a/pulsar-transaction-contrib/src/main/java/org/apache/pulsar/txn/impl/TransactionImpl.java
b/pulsar-transaction-contrib/src/main/java/org/apache/pulsar/txn/impl/TransactionImpl.java
new file mode 100644
index 0000000..94434cd
--- /dev/null
+++
b/pulsar-transaction-contrib/src/main/java/org/apache/pulsar/txn/impl/TransactionImpl.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pulsar.txn.impl;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import lombok.Getter;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.txn.api.Transaction;
+
+public class TransactionImpl implements Transaction {
+ @Getter
+ ConcurrentHashMap<Consumer<?>, List<MessageId>> receivedMessages = new
ConcurrentHashMap<>();
+
+ org.apache.pulsar.client.api.transaction.Transaction transaction;
+
+ public TransactionImpl(org.apache.pulsar.client.api.transaction.Transaction
transaction) {
+ this.transaction = transaction;
+ }
+
+ @Override
+ public void recordMsg(MessageId messageId, Consumer<?> consumer) {
+ receivedMessages.computeIfAbsent(consumer, k -> new
CopyOnWriteArrayList<>()).add(messageId);
+ }
+
+ @Override
+ public CompletableFuture<Void> ackAllReceivedMsgsAsync(Consumer<?> consumer)
{
+ List<MessageId> messageIds = receivedMessages.remove(consumer);
+ if (messageIds != null) {
+ return consumer.acknowledgeAsync(messageIds, transaction);
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public void ackAllReceivedMsgs(Consumer<?> consumer)
+ throws ExecutionException, InterruptedException {
+ ackAllReceivedMsgsAsync(consumer).get();
+ }
+
+ @Override
+ public void ackAllReceivedMsgs() throws ExecutionException,
InterruptedException {
+ ackAllReceivedMsgsAsync().get();
+ }
+
+ @Override
+ public CompletableFuture<Void> ackAllReceivedMsgsAsync() {
+ return FutureUtil.waitForAll(
+ receivedMessages.keySet().stream()
+ .map(this::ackAllReceivedMsgsAsync)
+ .collect(Collectors.toList()));
+ }
+
+ @Override
+ public <T> TypedMessageBuilder<T> newTransactionMessage(Producer<T>
producer) {
+ return producer.newMessage(transaction);
+ }
+
+ @Override
+ public CompletableFuture<Void> commitAsync() {
+ return transaction.commit();
+ }
+
+ @Override
+ public CompletableFuture<Void> abortAsync() {
+ return transaction.abort();
+ }
+
+ @Override
+ public void commit() throws ExecutionException, InterruptedException {
+ transaction.commit().get();
+ }
+
+ @Override
+ public void abort() throws ExecutionException, InterruptedException {
+ transaction.abort().get();
+ }
+
+ @Override
+ public TxnID getTxnID() {
+ return transaction.getTxnID();
+ }
+
+ @Override
+ public org.apache.pulsar.client.api.transaction.Transaction.State getState()
{
+ return transaction.getState();
+ }
+}
diff --git
a/pulsar-transaction-contrib/src/main/java/org/apache/pulsar/txn/impl/package-info.java
b/pulsar-transaction-contrib/src/main/java/org/apache/pulsar/txn/impl/package-info.java
new file mode 100644
index 0000000..2ec84e4
--- /dev/null
+++
b/pulsar-transaction-contrib/src/main/java/org/apache/pulsar/txn/impl/package-info.java
@@ -0,0 +1,14 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pulsar.txn.impl;
diff --git
a/pulsar-transaction-contrib/src/test/java/org/apache/pulsar/txn/SingletonPulsarContainer.java
b/pulsar-transaction-contrib/src/test/java/org/apache/pulsar/txn/SingletonPulsarContainer.java
new file mode 100644
index 0000000..afd3bdc
--- /dev/null
+++
b/pulsar-transaction-contrib/src/test/java/org/apache/pulsar/txn/SingletonPulsarContainer.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pulsar.txn;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Properties;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.testcontainers.containers.PulsarContainer;
+import org.testcontainers.utility.DockerImageName;
+
+@Slf4j
+public class SingletonPulsarContainer {
+
+ private static final PulsarContainer PULSAR_CONTAINER;
+
+ static {
+ PULSAR_CONTAINER = new PulsarContainer(getPulsarImage())
+
.withEnv("PULSAR_PREFIX_acknowledgmentAtBatchIndexLevelEnabled", "true")
+ .withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "true")
+ .withStartupTimeout(Duration.ofMinutes(3));
+ PULSAR_CONTAINER.start();
+ }
+
+ private static DockerImageName getPulsarImage() {
+ return DockerImageName.parse("apachepulsar/pulsar:" +
getPulsarImageVersion());
+ }
+
+ private static String getPulsarImageVersion() {
+ String pulsarVersion = "";
+ Properties properties = new Properties();
+ try {
+ properties.load(SingletonPulsarContainer.class.getClassLoader()
+ .getResourceAsStream("pulsar-container.properties"));
+ if (!properties.isEmpty()) {
+ pulsarVersion = properties.getProperty("pulsar.version");
+ }
+ } catch (IOException e) {
+ log.error("Failed to load pulsar version. " + e.getCause());
+ }
+ return pulsarVersion;
+ }
+
+ static PulsarClient createPulsarClient() throws PulsarClientException {
+ return PulsarClient.builder()
+
.serviceUrl(SingletonPulsarContainer.PULSAR_CONTAINER.getPulsarBrokerUrl())
+ .enableTransaction(true)
+ .build();
+ }
+
+ static PulsarAdmin createPulsarAdmin() throws PulsarClientException {
+ return PulsarAdmin.builder()
+
.serviceHttpUrl(SingletonPulsarContainer.PULSAR_CONTAINER.getHttpServiceUrl())
+ .build();
+ }
+}
diff --git
a/pulsar-transaction-contrib/src/test/java/org/apache/pulsar/txn/TransactionDemo.java
b/pulsar-transaction-contrib/src/test/java/org/apache/pulsar/txn/TransactionDemo.java
new file mode 100644
index 0000000..308c521
--- /dev/null
+++
b/pulsar-transaction-contrib/src/test/java/org/apache/pulsar/txn/TransactionDemo.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pulsar.txn;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.txn.api.Transaction;
+import org.apache.pulsar.txn.api.TransactionFactory;
+import org.testng.annotations.Test;
+
+public class TransactionDemo {
+
+ @Test
+ public void transactionDemo() throws Exception {
+ String pubTopic = "persistent://public/default/my-pub-topic";
+ String subTopic = "persistent://public/default/my-sub-topic";
+ String subscription = "my-subscription";
+
+ // Create a Pulsar client instance
+ PulsarClient client = SingletonPulsarContainer.createPulsarClient();
+
+ // Create a Transaction object
+ // Use TransactionFactory to create a transaction object with a timeout of
5 seconds
+ Transaction transaction =
+ TransactionFactory.createTransaction(client, 5,
TimeUnit.SECONDS).get();
+
+ // Create producers and a consumer
+ // Create two producers to send messages to different topics
+ Producer<String> producerToPubTopic =
client.newProducer(Schema.STRING).topic(pubTopic).create();
+ Producer<String> producerToSubTopic =
client.newProducer(Schema.STRING).topic(subTopic).create();
+
+ // Create a consumer to receive messages from the subTopic
+ Consumer<String> consumerFromSubTopic = client
+ .newConsumer(Schema.STRING)
+ .subscriptionName(subscription)
+ .topic(subTopic)
+ .subscribe();
+
+ // Send a message to the Sub Topic
+ producerToSubTopic.send("Hello World");
+
+ // Receive a message
+ Message<String> receivedMessage = consumerFromSubTopic.receive();
+ MessageId receivedMessageId = receivedMessage.getMessageId();
+
+ // Record the message in the transaction
+ transaction.recordMsg(receivedMessageId, consumerFromSubTopic);
+
+ // Forward the transaction message to the pub topic
+ // Use the transaction message builder to forward the received message to
the pubTopic
+
transaction.newTransactionMessage(producerToSubTopic).value(receivedMessage.getValue()).send();
+
+ // Acknowledge all received messages
+ // Acknowledge all messages received from the subTopic within the
transaction
+ transaction.ackAllReceivedMsgs(consumerFromSubTopic);
+
+ // Commit the transaction
+ // Commit the transaction to ensure all recorded messages and
acknowledgments take effect
+ transaction.commit();
+
+ // Close the consumer, producers, and client to release resources
+ consumerFromSubTopic.close();
+ producerToSubTopic.close();
+ client.close();
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-transaction-contrib/src/test/java/org/apache/pulsar/txn/TransactionImplTest.java
b/pulsar-transaction-contrib/src/test/java/org/apache/pulsar/txn/TransactionImplTest.java
new file mode 100644
index 0000000..e4171a6
--- /dev/null
+++
b/pulsar-transaction-contrib/src/test/java/org/apache/pulsar/txn/TransactionImplTest.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pulsar.txn;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.txn.impl.TransactionImpl;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TransactionImplTest {
+
+ private TransactionImpl transactionImpl;
+ private org.apache.pulsar.client.api.transaction.Transaction mockTransaction;
+ private List<Consumer<?>> mockConsumers;
+ private List<MessageId> messageIds;
+
+ @BeforeMethod
+ public void setUp() {
+ mockTransaction =
mock(org.apache.pulsar.client.api.transaction.Transaction.class);
+ mockConsumers = new ArrayList<>();
+ messageIds = new ArrayList<>();
+
+ // Create two mock consumers and two message IDs
+ for (int i = 0; i < 2; i++) {
+ Consumer<?> mockConsumer = mock(Consumer.class);
+ MessageId messageId = mock(MessageId.class);
+ mockConsumers.add(mockConsumer);
+ messageIds.add(messageId);
+ }
+
+ transactionImpl = new TransactionImpl(mockTransaction);
+ }
+
+ @Test
+ public void testRecordMsg() {
+ // Record a message for a consumer
+ Consumer<?> consumer = mockConsumers.get(0);
+ MessageId messageId = messageIds.get(0);
+ transactionImpl.recordMsg(messageId, consumer);
+
+ // Verify the message is recorded for the consumer
+
assertTrue(transactionImpl.getReceivedMessages().get(consumer).contains(messageId));
+ }
+
+ @Test
+ public void testAckAllReceivedMsgsAsync() throws ExecutionException,
InterruptedException {
+ // Record messages for different consumers
+ for (int i = 0; i < mockConsumers.size(); i++) {
+ Consumer<?> consumer = mockConsumers.get(i);
+ MessageId messageId = messageIds.get(i);
+ transactionImpl.recordMsg(messageId, consumer);
+ }
+
+ // Mock the acknowledgeAsync method for each consumer
+ for (Consumer<?> consumer : mockConsumers) {
+ when(consumer.acknowledgeAsync(anyList(), any()))
+ .thenReturn(CompletableFuture.completedFuture(null));
+ }
+
+ // Call the ackAllReceivedMsgsAsync method
+ CompletableFuture<Void> future = transactionImpl.ackAllReceivedMsgsAsync();
+ future.get();
+
+ // Verify each consumer called the correct acknowledgeAsync method with
the correct message IDs
+ for (int i = 0; i < mockConsumers.size(); i++) {
+ Consumer<?> consumer = mockConsumers.get(i);
+ MessageId messageId = messageIds.get(i);
+ verify(consumer).acknowledgeAsync(eq(List.of(messageId)),
eq(mockTransaction));
+ }
+ }
+
+ @Test
+ public void testAckAllReceivedMsgs() throws ExecutionException,
InterruptedException {
+ // Record messages for a consumer
+ Consumer<?> consumer = mockConsumers.get(0);
+ MessageId messageId = messageIds.get(0);
+ transactionImpl.recordMsg(messageId, consumer);
+
+ // Mock the acknowledgeAsync method for the consumer
+ when(consumer.acknowledgeAsync(anyList(), any()))
+ .thenReturn(CompletableFuture.completedFuture(null));
+
+ // Call the ackAllReceivedMsgs method
+ transactionImpl.ackAllReceivedMsgs(consumer);
+
+ // Verify the consumer called the correct acknowledgeAsync method with the
correct message IDs
+ verify(consumer).acknowledgeAsync(eq(List.of(messageId)),
eq(mockTransaction));
+ // Verify the message Ids were removed from the transaction context after
acked.
+ assertEquals(transactionImpl.getReceivedMessages().size(), 0);
+ }
+
+ @Test
+ public void testAckAllReceivedMsgsAll() throws ExecutionException,
InterruptedException {
+ // Record messages for different consumers
+ for (int i = 0; i < mockConsumers.size(); i++) {
+ Consumer<?> consumer = mockConsumers.get(i);
+ MessageId messageId = messageIds.get(i);
+ transactionImpl.recordMsg(messageId, consumer);
+ }
+
+ // Mock the acknowledgeAsync method for each consumer
+ for (Consumer<?> consumer : mockConsumers) {
+ when(consumer.acknowledgeAsync(anyList(), any()))
+ .thenReturn(CompletableFuture.completedFuture(null));
+ }
+
+ // Call the ackAllReceivedMsgs method
+ transactionImpl.ackAllReceivedMsgs();
+
+ // Verify each consumer called the correct acknowledgeAsync method with
the correct message IDs
+ for (int i = 0; i < mockConsumers.size(); i++) {
+ Consumer<?> consumer = mockConsumers.get(i);
+ MessageId messageId = messageIds.get(i);
+ verify(consumer).acknowledgeAsync(eq(List.of(messageId)),
eq(mockTransaction));
+ }
+ }
+
+ @Test
+ public void testAckAllReceivedMsgsAsyncAll() throws ExecutionException,
InterruptedException {
+ // Record messages for different consumers
+ for (int i = 0; i < mockConsumers.size(); i++) {
+ Consumer<?> consumer = mockConsumers.get(i);
+ MessageId messageId = messageIds.get(i);
+ transactionImpl.recordMsg(messageId, consumer);
+ }
+
+ // Mock the acknowledgeAsync method for each consumer
+ for (Consumer<?> consumer : mockConsumers) {
+ when(consumer.acknowledgeAsync(anyList(), any()))
+ .thenReturn(CompletableFuture.completedFuture(null));
+ }
+
+ // Call the ackAllReceivedMsgsAsync method
+ CompletableFuture<Void> future = transactionImpl.ackAllReceivedMsgsAsync();
+ future.get();
+
+ // Verify each consumer called the correct acknowledgeAsync method with
the correct message IDs
+ for (int i = 0; i < mockConsumers.size(); i++) {
+ Consumer<?> consumer = mockConsumers.get(i);
+ MessageId messageId = messageIds.get(i);
+ verify(consumer).acknowledgeAsync(eq(List.of(messageId)),
eq(mockTransaction));
+ }
+ }
+
+ @Test
+ public void testCommitAsync() throws ExecutionException,
InterruptedException {
+ // Mock the commit method of the transaction
+
when(mockTransaction.commit()).thenReturn(CompletableFuture.completedFuture(null));
+
+ // Call the commitAsync method
+ CompletableFuture<Void> future = transactionImpl.commitAsync();
+ future.get();
+
+ // Verify the commit method was called
+ verify(mockTransaction).commit();
+ }
+
+ @Test
+ public void testAbortAsync() throws ExecutionException, InterruptedException
{
+ // Mock the abort method of the transaction
+
when(mockTransaction.abort()).thenReturn(CompletableFuture.completedFuture(null));
+
+ // Call the abortAsync method
+ CompletableFuture<Void> future = transactionImpl.abortAsync();
+ future.get();
+
+ // Verify the abort method was called
+ verify(mockTransaction).abort();
+ }
+
+ @Test
+ public void testCommit() throws ExecutionException, InterruptedException {
+ // Mock the commit method of the transaction
+
when(mockTransaction.commit()).thenReturn(CompletableFuture.completedFuture(null));
+
+ // Call the commit method
+ transactionImpl.commit();
+
+ // Verify the commit method was called
+ verify(mockTransaction).commit();
+ }
+
+ @Test
+ public void testAbort() throws ExecutionException, InterruptedException {
+ // Mock the abort method of the transaction
+
when(mockTransaction.abort()).thenReturn(CompletableFuture.completedFuture(null));
+
+ // Call the abort method
+ transactionImpl.abort();
+
+ // Verify the abort method was called
+ verify(mockTransaction).abort();
+ }
+
+ @Test
+ public void testGetTxnID() {
+ // Mock the getTxnID method of the transaction
+ TxnID txnID = mock(TxnID.class);
+ when(mockTransaction.getTxnID()).thenReturn(txnID);
+
+ // Call the getTxnID method
+ assertEquals(txnID, transactionImpl.getTxnID());
+ }
+
+ @Test
+ public void testGetState() {
+ // Mock the getState method of the transaction
+ org.apache.pulsar.client.api.transaction.Transaction.State state =
+ org.apache.pulsar.client.api.transaction.Transaction.State.OPEN;
+ when(mockTransaction.getState()).thenReturn(state);
+
+ // Call the getState method
+ assertEquals(state, transactionImpl.getState());
+ }
+}