shibd commented on code in PR #1002:
URL: https://github.com/apache/pulsar-client-go/pull/1002#discussion_r1186124806
##########
pulsar/transaction_test.go:
##########
@@ -209,6 +209,7 @@ func createTcClient(t *testing.T)
(*transactionCoordinatorClient, *client) {
URL: webServiceURLTLS,
TLSTrustCertsFilePath: caCertsPath,
Authentication: NewAuthenticationTLS(tlsClientCertPath,
tlsClientKeyPath),
+ EnableTransaction: true,
Review Comment:
If we enable transactions, we do not need to repeat create
`TransactionCoordinatorClient`(215 line). Just get it by `tcClient :=
c.(*client).tcClient`.
##########
pulsar/consumer_partition.go:
##########
@@ -1389,6 +1471,14 @@ type ackRequest struct {
err error
}
+type ackWithTxnRequest struct {
+ doneCh chan struct{}
+ msgID trackingMessageID
+ Transaction *transaction
+ ackType int
Review Comment:
It seems `ackType` is redundant because it is always `Individual`.
##########
pulsar/transaction_test.go:
##########
@@ -217,3 +218,109 @@ func createTcClient(t *testing.T)
(*transactionCoordinatorClient, *client) {
return tcClient, c.(*client)
}
+
+// TestConsumeAndProduceWithTxn is a test function that validates the behavior
of producing and consuming
+// messages with and without transactions. It consists of the following steps:
+//
+// 1. Prepare: Create a PulsarClient and initialize the transaction
coordinator client.
+// 2. Prepare: Create a topic and a subscription.
+// 3. Produce 10 messages with a transaction and 10 messages without a
transaction.
+// - Expectation: The consumer should be able to receive the 10 messages sent
without a transaction,
+// but not the 10 messages sent with the transaction.
+// 4. Commit the transaction and receive the remaining 10 messages.
+// - Expectation: The consumer should be able to receive the 10 messages sent
with the transaction.
+// 5. Clean up: Close the consumer and producer instances.
+//
+// The test ensures that the consumer can only receive messages sent with a
transaction after it is committed,
+// and that it can always receive messages sent without a transaction.
+func TestConsumeAndProduceWithTxn(t *testing.T) {
Review Comment:
This unit test cover flow scenarios:
1. Producer sends some message to the topic, before committing txn. All
messages are not visible to the consumer.
2. When consumer receive message, use `ackWithTxn` to ack the message
together.
I think we need more unite tests to cover the transaction feature:
1. SourceTopic -> **consumer.ack and producer.send** -> SinkTopic. We need
to guarantee `ack source topic message` and `send message to sink topic` on a
transaction.
2. Transaction abort?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]