This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 74f3aec4 [ISSUE #688] Add information about transaction Source on
endTransaction Request (#689)
74f3aec4 is described below
commit 74f3aec491a269181215fe3b3cd2bbbcf14678a9
Author: Jixiang Jin <[email protected]>
AuthorDate: Wed Feb 28 15:26:55 2024 +0800
[ISSUE #688] Add information about transaction Source on endTransaction
Request (#689)
---
.../client/java/impl/producer/ProducerImpl.java | 12 +++++++-----
.../client/java/impl/producer/TransactionImpl.java | 21 +++++++++++++++++----
.../java/impl/producer/TransactionImplTest.java | 5 +++--
3 files changed, 27 insertions(+), 11 deletions(-)
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index 450a68d5..a17d1958 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -17,8 +17,6 @@
package org.apache.rocketmq.client.java.impl.producer;
-import static com.google.common.base.Preconditions.checkNotNull;
-
import apache.rocketmq.v2.ClientType;
import apache.rocketmq.v2.Code;
import apache.rocketmq.v2.EndTransactionRequest;
@@ -29,6 +27,8 @@ import apache.rocketmq.v2.RecoverOrphanedTransactionCommand;
import apache.rocketmq.v2.SendMessageRequest;
import apache.rocketmq.v2.SendMessageResponse;
import apache.rocketmq.v2.Status;
+import apache.rocketmq.v2.TransactionSource;
+import com.google.common.base.Preconditions;
import com.google.common.math.IntMath;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
@@ -163,7 +163,7 @@ class ProducerImpl extends ClientImpl implements Producer {
}
final GeneralMessage generalMessage = new
GeneralMessageImpl(messageView);
endTransaction(endpoints, generalMessage,
messageView.getMessageId(),
- transactionId, resolution);
+ transactionId, resolution,
TransactionSource.SOURCE_SERVER_CHECK);
} catch (Throwable t) {
log.error("Exception raised while ending the transaction,
messageId={}, transactionId={}, "
+ "endpoints={}, clientId={}", messageId,
transactionId, endpoints, clientId, t);
@@ -241,7 +241,7 @@ class ProducerImpl extends ClientImpl implements Producer {
*/
@Override
public Transaction beginTransaction() {
- checkNotNull(checker, "Transaction checker should not be null");
+ Preconditions.checkNotNull(checker, "Transaction checker should not be
null");
if (!this.isRunning()) {
log.error("Unable to begin a transaction because producer is not
running, state={}, clientId={}",
this.state(), clientId);
@@ -256,9 +256,11 @@ class ProducerImpl extends ClientImpl implements Producer {
}
public void endTransaction(Endpoints endpoints, GeneralMessage
generalMessage, MessageId messageId,
- String transactionId, final TransactionResolution resolution) throws
ClientException {
+ String transactionId, final TransactionResolution resolution, final
TransactionSource transactionSource)
+ throws ClientException {
final EndTransactionRequest.Builder builder =
EndTransactionRequest.newBuilder()
.setMessageId(messageId.toString()).setTransactionId(transactionId)
+ .setSource(transactionSource)
.setTopic(apache.rocketmq.v2.Resource.newBuilder()
.setResourceNamespace(clientConfiguration.getNamespace())
.setName(generalMessage.getTopic())
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/TransactionImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/TransactionImpl.java
index 51ca92cc..9bc97c24 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/TransactionImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/TransactionImpl.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.client.java.impl.producer;
+import apache.rocketmq.v2.TransactionSource;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.io.IOException;
import java.util.HashSet;
@@ -94,8 +95,14 @@ class TransactionImpl implements Transaction {
for (Map.Entry<PublishingMessageImpl, SendReceiptImpl> entry :
messageSendReceiptMap.entrySet()) {
final PublishingMessageImpl publishingMessage = entry.getKey();
final SendReceiptImpl sendReceipt = entry.getValue();
- producerImpl.endTransaction(sendReceipt.getEndpoints(), new
GeneralMessageImpl(publishingMessage),
- sendReceipt.getMessageId(), sendReceipt.getTransactionId(),
TransactionResolution.COMMIT);
+ producerImpl.endTransaction(
+ sendReceipt.getEndpoints(),
+ new GeneralMessageImpl(publishingMessage),
+ sendReceipt.getMessageId(),
+ sendReceipt.getTransactionId(),
+ TransactionResolution.COMMIT,
+ TransactionSource.SOURCE_CLIENT
+ );
}
}
@@ -107,8 +114,14 @@ class TransactionImpl implements Transaction {
for (Map.Entry<PublishingMessageImpl, SendReceiptImpl> entry :
messageSendReceiptMap.entrySet()) {
final PublishingMessageImpl publishingMessage = entry.getKey();
final SendReceiptImpl sendReceipt = entry.getValue();
- producerImpl.endTransaction(sendReceipt.getEndpoints(), new
GeneralMessageImpl(publishingMessage),
- sendReceipt.getMessageId(), sendReceipt.getTransactionId(),
TransactionResolution.ROLLBACK);
+ producerImpl.endTransaction(
+ sendReceipt.getEndpoints(),
+ new GeneralMessageImpl(publishingMessage),
+ sendReceipt.getMessageId(),
+ sendReceipt.getTransactionId(),
+ TransactionResolution.ROLLBACK,
+ TransactionSource.SOURCE_CLIENT
+ );
}
}
}
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
index 6cca321b..68d05078 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
@@ -20,6 +20,7 @@ package org.apache.rocketmq.client.java.impl.producer;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
+import apache.rocketmq.v2.TransactionSource;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
@@ -115,7 +116,7 @@ public class TransactionImplTest extends TestBase {
final SendReceiptImpl sendReceipt =
fakeSendReceiptImpl(fakeMessageQueueImpl(FAKE_TOPIC_0));
transaction.tryAddReceipt(publishingMessage, sendReceipt);
Mockito.doNothing().when(producer).endTransaction(any(Endpoints.class),
any(GeneralMessage.class),
- any(MessageId.class), anyString(),
any(TransactionResolution.class));
+ any(MessageId.class), anyString(),
any(TransactionResolution.class), any(TransactionSource.class));
transaction.commit();
}
@@ -130,7 +131,7 @@ public class TransactionImplTest extends TestBase {
final SendReceiptImpl sendReceipt =
fakeSendReceiptImpl(fakeMessageQueueImpl(FAKE_TOPIC_0));
transaction.tryAddReceipt(publishingMessage, sendReceipt);
Mockito.doNothing().when(producer).endTransaction(any(Endpoints.class),
any(GeneralMessage.class),
- any(MessageId.class), anyString(),
any(TransactionResolution.class));
+ any(MessageId.class), anyString(),
any(TransactionResolution.class), any(TransactionSource.class));
transaction.rollback();
}
}
\ No newline at end of file