This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 90764e7a030 [fix][txn] Always send correct transaction id in end txn
response (#19137)
90764e7a030 is described below
commit 90764e7a03034589764343202196508e7950584e
Author: Nicolò Boschi <[email protected]>
AuthorDate: Tue Jan 10 07:14:47 2023 +0100
[fix][txn] Always send correct transaction id in end txn response (#19137)
---
.../pulsar/broker/service/PulsarCommandSender.java | 2 +-
.../broker/service/PulsarCommandSenderImpl.java | 7 +-
.../apache/pulsar/broker/service/ServerCnx.java | 44 ++--
.../pulsar/broker/service/ServerCnxTest.java | 284 ++++++++++++++++++++-
.../broker/service/utils/ClientChannelHelper.java | 35 ++-
.../client/impl/TransactionMetaStoreHandler.java | 62 +++--
.../apache/pulsar/common/protocol/Commands.java | 16 +-
7 files changed, 382 insertions(+), 68 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
index 50579b3d741..f25703757d3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
@@ -87,7 +87,7 @@ public interface PulsarCommandSender {
void sendNewTxnResponse(long requestId, TxnID txnID, long tcID);
- void sendNewTxnErrorResponse(long requestId, long txnID, ServerError
error, String message);
+ void sendNewTxnErrorResponse(long requestId, long tcID, ServerError error,
String message);
void sendEndTxnResponse(long requestId, TxnID txnID, int txnAction);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
index b5f4d17801c..e7cc25b2c3b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
@@ -331,8 +331,8 @@ public class PulsarCommandSenderImpl implements
PulsarCommandSender {
}
@Override
- public void sendNewTxnErrorResponse(long requestId, long txnID,
ServerError error, String message) {
- BaseCommand command = Commands.newTxnResponse(requestId, txnID, error,
message);
+ public void sendNewTxnErrorResponse(long requestId, long tcID, ServerError
error, String message) {
+ BaseCommand command = Commands.newTxnResponse(requestId, tcID, error,
message);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
@@ -352,7 +352,8 @@ public class PulsarCommandSenderImpl implements
PulsarCommandSender {
@Override
public void sendEndTxnErrorResponse(long requestId, TxnID txnID,
ServerError error, String message) {
- BaseCommand command = Commands.newEndTxnResponse(requestId,
txnID.getMostSigBits(), error, message);
+ BaseCommand command = Commands.newEndTxnResponse(requestId,
txnID.getLeastSigBits(),
+ txnID.getMostSigBits(), error, message);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise());
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 9e80e98064a..a61468c6469 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2406,7 +2406,9 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
} else {
ex = handleTxnException(ex,
BaseCommand.Type.ADD_PARTITION_TO_TXN.name(), requestId);
-
ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId,
txnID.getMostSigBits(),
+
ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId,
+ txnID.getLeastSigBits(),
+ txnID.getMostSigBits(),
BrokerServiceException.getClientErrorCode(ex),
ex.getMessage()));
transactionMetadataStoreService.handleOpFail(ex, tcId);
@@ -2451,7 +2453,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
final long requestId = command.getRequestId();
final String topic = command.getTopic();
final int txnAction = command.getTxnAction().getValue();
- TxnID txnID = new TxnID(command.getTxnidMostBits(),
command.getTxnidLeastBits());
+ final TxnID txnID = new TxnID(command.getTxnidMostBits(),
command.getTxnidLeastBits());
final long lowWaterMark = command.getTxnidLeastBitsOfLowWatermark();
if (log.isDebugEnabled()) {
@@ -2487,7 +2489,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId,
ServerError.ServiceNotReady,
"The topic " + topic + " does not
exist in broker.",
- txnID.getMostSigBits(),
txnID.getLeastSigBits()));
+ txnID.getLeastSigBits(),
txnID.getMostSigBits()));
} else {
log.warn("handleEndTxnOnPartition fail ! The
topic {} has not been created, "
+ "txnId: [{}], txnAction:
[{}]",
@@ -2496,13 +2498,13 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
txnID.getLeastSigBits(),
txnID.getMostSigBits()));
}
}).exceptionally(e -> {
- log.error("handleEndTxnOnPartition fail ! topic {}, "
- + "txnId: [{}], txnAction: [{}]", topic,
txnID,
- TxnAction.valueOf(txnAction), e.getCause());
- ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
- requestId, ServerError.ServiceNotReady,
- e.getMessage(), txnID.getLeastSigBits(),
txnID.getMostSigBits()));
- return null;
+ log.error("handleEndTxnOnPartition fail ! topic
{}, "
+ + "txnId: [{}], txnAction: [{}]",
topic, txnID,
+ TxnAction.valueOf(txnAction),
e.getCause());
+
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
+ requestId, ServerError.ServiceNotReady,
+ e.getMessage(), txnID.getLeastSigBits(),
txnID.getMostSigBits()));
+ return null;
});
}
}).exceptionally(e -> {
@@ -2556,7 +2558,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
requestId, txnidLeastBits, txnidMostBits,
BrokerServiceException.getClientErrorCode(e),
- "Handle end txn on subscription failed."));
+ "Handle end txn on subscription failed: " +
e.getMessage()));
return;
}
ctx.writeAndFlush(
@@ -2569,7 +2571,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
if (b) {
log.error("handleEndTxnOnSubscription fail!
The topic {} does not exist in broker, "
+ "subscription: {}, txnId:
[{}], txnAction: [{}]", topic, subName,
- new TxnID(txnidMostBits,
txnidLeastBits), TxnAction.valueOf(txnAction));
+ txnID, TxnAction.valueOf(txnAction));
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
requestId, txnID.getLeastSigBits(),
txnID.getMostSigBits(),
ServerError.ServiceNotReady,
@@ -2582,13 +2584,13 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
txnID.getLeastSigBits(),
txnID.getMostSigBits()));
}
}).exceptionally(e -> {
- log.error("handleEndTxnOnSubscription fail ! topic {},
subscription: {}"
- + "txnId: [{}], txnAction: [{}]", topic,
subName,
- txnID, TxnAction.valueOf(txnAction), e.getCause());
- ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
- requestId, txnID.getLeastSigBits(),
txnID.getMostSigBits(),
- ServerError.ServiceNotReady, e.getMessage()));
- return null;
+ log.error("handleEndTxnOnSubscription fail ! topic
{}, subscription: {}"
+ + "txnId: [{}], txnAction: [{}]",
topic, subName,
+ txnID, TxnAction.valueOf(txnAction),
e.getCause());
+
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
+ requestId, txnID.getLeastSigBits(),
txnID.getMostSigBits(),
+ ServerError.ServiceNotReady,
e.getMessage()));
+ return null;
});
}
}).exceptionally(e -> {
@@ -2598,7 +2600,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
requestId, txnidLeastBits, txnidMostBits,
ServerError.ServiceNotReady,
- "Handle end txn on subscription failed."));
+ "Handle end txn on subscription failed: " +
e.getMessage()));
return null;
});
}
@@ -2655,7 +2657,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
} else {
ex = handleTxnException(ex,
BaseCommand.Type.ADD_SUBSCRIPTION_TO_TXN.name(), requestId);
-
ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId,
+
ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId,
txnID.getLeastSigBits(),
txnID.getMostSigBits(),
BrokerServiceException.getClientErrorCode(ex),
ex.getMessage()));
transactionMetadataStoreService.handleOpFail(ex, tcId);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 97b58d99e3d..1a98822340f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -22,6 +22,9 @@ import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructor
import static
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
import static
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyObject;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.CALLS_REAL_METHODS;
@@ -76,13 +79,13 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.TransactionMetadataStoreService;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
-import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.NamespaceResources;
@@ -92,13 +95,19 @@ import org.apache.pulsar.broker.service.ServerCnx.State;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService;
import org.apache.pulsar.broker.service.utils.ClientChannelHelper;
+import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.AuthMethod;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.BaseCommand.Type;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
+import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse;
+import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxnResponse;
import org.apache.pulsar.common.api.proto.CommandAuthResponse;
import org.apache.pulsar.common.api.proto.CommandConnected;
+import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse;
+import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscriptionResponse;
+import org.apache.pulsar.common.api.proto.CommandEndTxnResponse;
import org.apache.pulsar.common.api.proto.CommandError;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespaceResponse;
@@ -114,6 +123,8 @@ import
org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.api.proto.ServerError;
+import org.apache.pulsar.common.api.proto.Subscription;
+import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
@@ -151,7 +162,6 @@ public class ServerCnxTest {
private ClientChannelHelper clientChannelHelper;
private PulsarService pulsar;
private MetadataStoreExtended store;
- private ConfigurationCacheService configCacheService;
private NamespaceResources namespaceResources;
protected NamespaceService namespaceService;
private final int currentProtocolVersion =
ProtocolVersion.values()[ProtocolVersion.values().length - 1]
@@ -2059,7 +2069,7 @@ public class ServerCnxTest {
return false;
}
};
- Mockito.when(consumers.putIfAbsent(Mockito.anyLong(),
Mockito.any())).thenReturn(existingConsumerFuture);
+ Mockito.when(consumers.putIfAbsent(anyLong(),
Mockito.any())).thenReturn(existingConsumerFuture);
// do test: delay complete after execute 'isDone()' many times
// Why is the design so complicated, see:
https://github.com/apache/pulsar/pull/15051
try (MockedStatic<ConcurrentLongHashMap> theMock =
Mockito.mockStatic(ConcurrentLongHashMap.class)) {
@@ -2098,12 +2108,12 @@ public class ServerCnxTest {
}
// case3: exists existingConsumerFuture, already complete and exception
CompletableFuture existingConsumerFuture =
Mockito.mock(CompletableFuture.class);
- Mockito.when(consumers.putIfAbsent(Mockito.anyLong(),
Mockito.any())).thenReturn(existingConsumerFuture);
+ Mockito.when(consumers.putIfAbsent(anyLong(),
Mockito.any())).thenReturn(existingConsumerFuture);
// make consumerFuture delay finish
Mockito.when(existingConsumerFuture.isDone()).thenReturn(true);
// when sync get return, future will return success value.
Mockito.when(existingConsumerFuture.get()).thenThrow(new
NullPointerException());
- Mockito.when(existingConsumerFuture.get(Mockito.anyLong(),
Mockito.any())).
+ Mockito.when(existingConsumerFuture.get(anyLong(), Mockito.any())).
thenThrow(new NullPointerException());
Mockito.when(existingConsumerFuture.isCompletedExceptionally()).thenReturn(true);
Mockito.when(existingConsumerFuture.getNow(Mockito.any())).thenThrow(new
NullPointerException());
@@ -2326,4 +2336,268 @@ public class ServerCnxTest {
assertEquals(((CommandPartitionedTopicMetadataResponse)
response).getError(), ServerError.ServiceNotReady);
channel.finish();
}
+
+ @Test(timeOut = 30000)
+ public void sendAddPartitionToTxnResponse() throws Exception {
+ final TransactionMetadataStoreService txnStore =
mock(TransactionMetadataStoreService.class);
+ when(txnStore.addProducedPartitionToTxn(any(TxnID.class), any()))
+ .thenReturn(CompletableFuture.completedFuture(null));
+ when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+ svcConfig.setTransactionCoordinatorEnabled(true);
+ resetChannel();
+ setChannelConnected();
+ ByteBuf clientCommand = Commands.newAddPartitionToTxn(89L, 1L, 12L,
+ List.of("tenant/ns/topic1"));
+ channel.writeInbound(clientCommand);
+ CommandAddPartitionToTxnResponse response =
(CommandAddPartitionToTxnResponse) getResponse();
+
+ assertEquals(response.getRequestId(), 89L);
+ assertEquals(response.getTxnidLeastBits(), 1L);
+ assertEquals(response.getTxnidMostBits(), 12L);
+ assertFalse(response.hasError());
+ assertFalse(response.hasMessage());
+
+ channel.finish();
+ }
+
+ @Test(timeOut = 30000)
+ public void sendAddPartitionToTxnResponseFailed() throws Exception {
+ final TransactionMetadataStoreService txnStore =
mock(TransactionMetadataStoreService.class);
+ when(txnStore.addProducedPartitionToTxn(any(TxnID.class), any()))
+ .thenReturn(CompletableFuture.failedFuture(new
RuntimeException("server error")));
+ when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+ svcConfig.setTransactionCoordinatorEnabled(true);
+ resetChannel();
+ setChannelConnected();
+ ByteBuf clientCommand = Commands.newAddPartitionToTxn(89L, 1L, 12L,
+ List.of("tenant/ns/topic1"));
+ channel.writeInbound(clientCommand);
+ CommandAddPartitionToTxnResponse response =
(CommandAddPartitionToTxnResponse) getResponse();
+
+ assertEquals(response.getRequestId(), 89L);
+ assertEquals(response.getTxnidLeastBits(), 1L);
+ assertEquals(response.getTxnidMostBits(), 12L);
+ assertEquals(response.getError().getValue(), 0);
+ assertEquals(response.getMessage(), "server error");
+
+ channel.finish();
+ }
+
+ @Test(timeOut = 30000)
+ public void sendAddSubscriptionToTxnResponse() throws Exception {
+ final TransactionMetadataStoreService txnStore =
mock(TransactionMetadataStoreService.class);
+ when(txnStore.addAckedPartitionToTxn(any(TxnID.class), any()))
+ .thenReturn(CompletableFuture.completedFuture(null));
+ when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+ svcConfig.setTransactionCoordinatorEnabled(true);
+ resetChannel();
+ setChannelConnected();
+ final Subscription sub = new Subscription();
+ sub.setTopic("topic1");
+ sub.setSubscription("sub1");
+ ByteBuf clientCommand = Commands.newAddSubscriptionToTxn(89L, 1L, 12L,
+ List.of(sub));
+ channel.writeInbound(clientCommand);
+ CommandAddSubscriptionToTxnResponse response =
(CommandAddSubscriptionToTxnResponse) getResponse();
+
+ assertEquals(response.getRequestId(), 89L);
+ assertEquals(response.getTxnidLeastBits(), 1L);
+ assertEquals(response.getTxnidMostBits(), 12L);
+ assertFalse(response.hasError());
+ assertFalse(response.hasMessage());
+
+ channel.finish();
+ }
+
+ @Test(timeOut = 30000)
+ public void sendAddSubscriptionToTxnResponseFailed() throws Exception {
+ final TransactionMetadataStoreService txnStore =
mock(TransactionMetadataStoreService.class);
+ when(txnStore.addAckedPartitionToTxn(any(TxnID.class), any()))
+ .thenReturn(CompletableFuture.failedFuture(new
RuntimeException("server error")));
+ when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+ svcConfig.setTransactionCoordinatorEnabled(true);
+ resetChannel();
+ setChannelConnected();
+ final Subscription sub = new Subscription();
+ sub.setTopic("topic1");
+ sub.setSubscription("sub1");
+ ByteBuf clientCommand = Commands.newAddSubscriptionToTxn(89L, 1L, 12L,
+ List.of(sub));
+ channel.writeInbound(clientCommand);
+ CommandAddSubscriptionToTxnResponse response =
(CommandAddSubscriptionToTxnResponse) getResponse();
+
+ assertEquals(response.getRequestId(), 89L);
+ assertEquals(response.getTxnidLeastBits(), 1L);
+ assertEquals(response.getTxnidMostBits(), 12L);
+ assertEquals(response.getError().getValue(), 0);
+ assertEquals(response.getMessage(), "server error");
+
+ channel.finish();
+ }
+
+
+ @Test(timeOut = 30000)
+ public void sendEndTxnResponse() throws Exception {
+ final TransactionMetadataStoreService txnStore =
mock(TransactionMetadataStoreService.class);
+ when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean()))
+ .thenReturn(CompletableFuture.completedFuture(null));
+ when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+ svcConfig.setTransactionCoordinatorEnabled(true);
+ resetChannel();
+ setChannelConnected();
+ ByteBuf clientCommand =
Commands.serializeWithSize(Commands.newEndTxn(89L, 1L, 12L,
+ TxnAction.COMMIT));
+ channel.writeInbound(clientCommand);
+ CommandEndTxnResponse response = (CommandEndTxnResponse) getResponse();
+
+ assertEquals(response.getRequestId(), 89L);
+ assertEquals(response.getTxnidLeastBits(), 1L);
+ assertEquals(response.getTxnidMostBits(), 12L);
+ assertFalse(response.hasError());
+ assertFalse(response.hasMessage());
+
+ channel.finish();
+ }
+
+ @Test(timeOut = 30000)
+ public void sendEndTxnResponseFailed() throws Exception {
+ final TransactionMetadataStoreService txnStore =
mock(TransactionMetadataStoreService.class);
+ when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean()))
+ .thenReturn(CompletableFuture.failedFuture(new
RuntimeException("server error")));
+ when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+ svcConfig.setTransactionCoordinatorEnabled(true);
+ resetChannel();
+ setChannelConnected();
+ ByteBuf clientCommand =
Commands.serializeWithSize(Commands.newEndTxn(89L, 1L, 12L,
+ TxnAction.COMMIT));
+ channel.writeInbound(clientCommand);
+ CommandEndTxnResponse response = (CommandEndTxnResponse) getResponse();
+
+ assertEquals(response.getRequestId(), 89L);
+ assertEquals(response.getTxnidLeastBits(), 1L);
+ assertEquals(response.getTxnidMostBits(), 12L);
+ assertEquals(response.getError().getValue(), 0);
+ assertEquals(response.getMessage(), "server error");
+
+ channel.finish();
+ }
+
+ @Test(timeOut = 30000)
+ public void sendEndTxnOnPartitionResponse() throws Exception {
+ final TransactionMetadataStoreService txnStore =
mock(TransactionMetadataStoreService.class);
+ when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean()))
+ .thenReturn(CompletableFuture.completedFuture(null));
+ when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+
+ svcConfig.setTransactionCoordinatorEnabled(true);
+ resetChannel();
+ setChannelConnected();
+ Topic topic = mock(Topic.class);
+
doReturn(CompletableFuture.completedFuture(null)).when(topic).endTxn(any(TxnID.class),
anyInt(), anyLong());
+
doReturn(CompletableFuture.completedFuture(Optional.of(topic))).when(brokerService).getTopicIfExists(any(String.class));
+ ByteBuf clientCommand = Commands.newEndTxnOnPartition(89L, 1L, 12L,
+ successTopicName, TxnAction.COMMIT, 1L);
+ channel.writeInbound(clientCommand);
+ CommandEndTxnOnPartitionResponse response =
(CommandEndTxnOnPartitionResponse) getResponse();
+
+ assertEquals(response.getRequestId(), 89L);
+ assertEquals(response.getTxnidLeastBits(), 1L);
+ assertEquals(response.getTxnidMostBits(), 12L);
+ assertFalse(response.hasError());
+ assertFalse(response.hasMessage());
+
+ channel.finish();
+ }
+
+ @Test(timeOut = 30000)
+ public void sendEndTxnOnPartitionResponseFailed() throws Exception {
+ final TransactionMetadataStoreService txnStore =
mock(TransactionMetadataStoreService.class);
+ when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean()))
+ .thenReturn(CompletableFuture.completedFuture(null));
+ when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+
+ svcConfig.setTransactionCoordinatorEnabled(true);
+ resetChannel();
+ setChannelConnected();
+ Topic topic = mock(Topic.class);
+ doReturn(CompletableFuture.failedFuture(new RuntimeException("server
error"))).when(topic).endTxn(any(TxnID.class), anyInt(), anyLong());
+
doReturn(CompletableFuture.completedFuture(Optional.of(topic))).when(brokerService).getTopicIfExists(any(String.class));
+ ByteBuf clientCommand = Commands.newEndTxnOnPartition(89L, 1L, 12L,
+ successTopicName, TxnAction.COMMIT, 1L);
+ channel.writeInbound(clientCommand);
+ CommandEndTxnOnPartitionResponse response =
(CommandEndTxnOnPartitionResponse) getResponse();
+
+ assertEquals(response.getRequestId(), 89L);
+ assertEquals(response.getTxnidLeastBits(), 1L);
+ assertEquals(response.getTxnidMostBits(), 12L);
+ assertEquals(response.getError().getValue(), 0);
+ assertEquals(response.getMessage(), "server error");
+
+ channel.finish();
+ }
+
+ @Test(timeOut = 30000)
+ public void sendEndTxnOnSubscription() throws Exception {
+ final TransactionMetadataStoreService txnStore =
mock(TransactionMetadataStoreService.class);
+ when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean()))
+ .thenReturn(CompletableFuture.completedFuture(null));
+ when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+
+ svcConfig.setTransactionCoordinatorEnabled(true);
+ resetChannel();
+ setChannelConnected();
+ Topic topic = mock(Topic.class);
+ final org.apache.pulsar.broker.service.Subscription sub =
mock(org.apache.pulsar.broker.service.Subscription.class);
+ doReturn(sub).when(topic).getSubscription(any());
+ doReturn(CompletableFuture.completedFuture(null))
+ .when(sub).endTxn(anyLong(), anyLong(), anyInt(), anyLong());
+
doReturn(CompletableFuture.completedFuture(Optional.of(topic))).when(brokerService).getTopicIfExists(any(String.class));
+
+ ByteBuf clientCommand = Commands.newEndTxnOnSubscription(89L, 1L, 12L,
+ successTopicName, successSubName, TxnAction.COMMIT, 1L);
+ channel.writeInbound(clientCommand);
+ CommandEndTxnOnSubscriptionResponse response =
(CommandEndTxnOnSubscriptionResponse) getResponse();
+
+ assertEquals(response.getRequestId(), 89L);
+ assertEquals(response.getTxnidLeastBits(), 1L);
+ assertEquals(response.getTxnidMostBits(), 12L);
+ assertFalse(response.hasError());
+ assertFalse(response.hasMessage());
+
+ channel.finish();
+ }
+
+
+ @Test(timeOut = 30000)
+ public void sendEndTxnOnSubscriptionFailed() throws Exception {
+ final TransactionMetadataStoreService txnStore =
mock(TransactionMetadataStoreService.class);
+ when(txnStore.endTransaction(any(TxnID.class), anyInt(), anyBoolean()))
+ .thenReturn(CompletableFuture.completedFuture(null));
+ when(pulsar.getTransactionMetadataStoreService()).thenReturn(txnStore);
+
+ svcConfig.setTransactionCoordinatorEnabled(true);
+ resetChannel();
+ setChannelConnected();
+ Topic topic = mock(Topic.class);
+
+ final org.apache.pulsar.broker.service.Subscription sub =
mock(org.apache.pulsar.broker.service.Subscription.class);
+ doReturn(sub).when(topic).getSubscription(any());
+ doReturn(CompletableFuture.failedFuture(new RuntimeException("server
error")))
+ .when(sub).endTxn(anyLong(), anyLong(), anyInt(), anyLong());
+
doReturn(CompletableFuture.completedFuture(Optional.of(topic))).when(brokerService).getTopicIfExists(any(String.class));
+
+ ByteBuf clientCommand = Commands.newEndTxnOnSubscription(89L, 1L, 12L,
+ successTopicName, successSubName, TxnAction.COMMIT, 1L);
+ channel.writeInbound(clientCommand);
+ CommandEndTxnOnSubscriptionResponse response =
(CommandEndTxnOnSubscriptionResponse) getResponse();
+
+ assertEquals(response.getRequestId(), 89L);
+ assertEquals(response.getTxnidLeastBits(), 1L);
+ assertEquals(response.getTxnidMostBits(), 12L);
+ assertEquals(response.getError().getValue(), 0);
+ assertEquals(response.getMessage(), "Handle end txn on subscription
failed: server error");
+
+ channel.finish();
+ }
+
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
index 33d954f7bfa..2dc56282a79 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
@@ -19,7 +19,11 @@
package org.apache.pulsar.broker.service.utils;
import java.util.Queue;
-
+import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse;
+import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxnResponse;
+import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse;
+import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscriptionResponse;
+import org.apache.pulsar.common.api.proto.CommandEndTxnResponse;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespaceResponse;
import
org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse;
import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
@@ -168,6 +172,35 @@ public class ClientChannelHelper {
protected void
handlePartitionResponse(CommandPartitionedTopicMetadataResponse response) {
queue.offer(new
CommandPartitionedTopicMetadataResponse().copyFrom(response));
}
+
+ @Override
+ protected void handleAddPartitionToTxnResponse(
+ CommandAddPartitionToTxnResponse
commandAddPartitionToTxnResponse) {
+ queue.offer(new
CommandAddPartitionToTxnResponse().copyFrom(commandAddPartitionToTxnResponse));
+ }
+
+ @Override
+ protected void handleAddSubscriptionToTxnResponse(
+ CommandAddSubscriptionToTxnResponse
commandAddSubscriptionToTxnResponse) {
+ queue.offer(new
CommandAddSubscriptionToTxnResponse().copyFrom(commandAddSubscriptionToTxnResponse));
+ }
+
+ @Override
+ protected void handleEndTxnResponse(CommandEndTxnResponse
commandEndTxnResponse) {
+ queue.offer(new
CommandEndTxnResponse().copyFrom(commandEndTxnResponse));
+ }
+
+ @Override
+ protected void handleEndTxnOnPartitionResponse(
+ CommandEndTxnOnPartitionResponse
commandEndTxnOnPartitionResponse) {
+ queue.offer(new
CommandEndTxnOnPartitionResponse().copyFrom(commandEndTxnOnPartitionResponse));
+ }
+
+ @Override
+ protected void handleEndTxnOnSubscriptionResponse(
+ CommandEndTxnOnSubscriptionResponse
commandEndTxnOnSubscriptionResponse) {
+ queue.offer(new
CommandEndTxnOnSubscriptionResponse().copyFrom(commandEndTxnOnSubscriptionResponse));
+ }
};
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index 885feb09cd4..9e56c00bbb7 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -216,9 +216,9 @@ public class TransactionMetaStoreHandler extends
HandlerState
}
void handleNewTxnResponse(CommandNewTxnResponse response) {
- boolean hasError = response.hasError();
- ServerError error;
- String message;
+ final boolean hasError = response.hasError();
+ final ServerError error;
+ final String message;
if (hasError) {
error = response.getError();
message = response.getMessage();
@@ -226,14 +226,13 @@ public class TransactionMetaStoreHandler extends
HandlerState
error = null;
message = null;
}
- TxnID txnID = new TxnID(response.getTxnidMostBits(),
response.getTxnidLeastBits());
- long requestId = response.getRequestId();
+ final TxnID txnID = new TxnID(response.getTxnidMostBits(),
response.getTxnidLeastBits());
+ final long requestId = response.getRequestId();
internalPinnedExecutor.execute(() -> {
OpForTxnIdCallBack op = (OpForTxnIdCallBack)
pendingRequests.remove(requestId);
if (op == null) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Got new txn response for timeout {} - {}",
txnID.getMostSigBits(),
- txnID.getLeastSigBits());
+ LOG.debug("Got new txn response for transaction {}",
txnID);
}
return;
}
@@ -300,9 +299,9 @@ public class TransactionMetaStoreHandler extends
HandlerState
}
void
handleAddPublishPartitionToTxnResponse(CommandAddPartitionToTxnResponse
response) {
- boolean hasError = response.hasError();
- ServerError error;
- String message;
+ final boolean hasError = response.hasError();
+ final ServerError error;
+ final String message;
if (hasError) {
error = response.getError();
message = response.getMessage();
@@ -310,14 +309,13 @@ public class TransactionMetaStoreHandler extends
HandlerState
error = null;
message = null;
}
- TxnID txnID = new TxnID(response.getTxnidMostBits(),
response.getTxnidLeastBits());
- long requestId = response.getRequestId();
+ final TxnID txnID = new TxnID(response.getTxnidMostBits(),
response.getTxnidLeastBits());
+ final long requestId = response.getRequestId();
internalPinnedExecutor.execute(() -> {
OpForVoidCallBack op = (OpForVoidCallBack)
pendingRequests.remove(requestId);
if (op == null) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Got add publish partition to txn response for
timeout {} - {}", txnID.getMostSigBits(),
- txnID.getLeastSigBits());
+ LOG.debug("Got add publish partition to txn response for
transaction {}", txnID);
}
return;
}
@@ -351,8 +349,8 @@ public class TransactionMetaStoreHandler extends
HandlerState
, op.backoff.next(), TimeUnit.MILLISECONDS);
return;
}
- LOG.error("{} for request {} error {} with txnID {}.",
BaseCommand.Type.ADD_PARTITION_TO_TXN.name(),
- requestId, error, txnID);
+ LOG.error("{} for request {}, transaction {}, error: {}",
+ BaseCommand.Type.ADD_PARTITION_TO_TXN.name(),
requestId, txnID, error);
}
@@ -384,9 +382,9 @@ public class TransactionMetaStoreHandler extends
HandlerState
}
public void
handleAddSubscriptionToTxnResponse(CommandAddSubscriptionToTxnResponse
response) {
- boolean hasError = response.hasError();
- ServerError error;
- String message;
+ final boolean hasError = response.hasError();
+ final ServerError error;
+ final String message;
if (hasError) {
error = response.getError();
message = response.getMessage();
@@ -394,7 +392,8 @@ public class TransactionMetaStoreHandler extends
HandlerState
error = null;
message = null;
}
- long requestId = response.getRequestId();
+ final long requestId = response.getRequestId();
+ final TxnID txnID = new TxnID(response.getTxnidMostBits(),
response.getTxnidLeastBits());
internalPinnedExecutor.execute(() -> {
OpForVoidCallBack op = (OpForVoidCallBack)
pendingRequests.remove(requestId);
if (op == null) {
@@ -410,8 +409,8 @@ public class TransactionMetaStoreHandler extends
HandlerState
}
op.callback.complete(null);
} else {
- LOG.error("Add subscription to txn failed for request {} error
{}.",
- requestId, error);
+ LOG.error("Add subscription to txn failed for request {},
transaction {}, error: {}",
+ requestId, txnID, error);
if (checkIfNeedRetryByError(error, message, op)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Get a response for {} request {} error
TransactionCoordinatorNotFound and try it"
@@ -465,9 +464,9 @@ public class TransactionMetaStoreHandler extends
HandlerState
}
void handleEndTxnResponse(CommandEndTxnResponse response) {
- boolean hasError = response.hasError();
- ServerError error;
- String message;
+ final boolean hasError = response.hasError();
+ final ServerError error;
+ final String message;
if (hasError) {
error = response.getError();
message = response.getMessage();
@@ -475,21 +474,20 @@ public class TransactionMetaStoreHandler extends
HandlerState
error = null;
message = null;
}
- TxnID txnID = new TxnID(response.getTxnidMostBits(),
response.getTxnidLeastBits());
- long requestId = response.getRequestId();
+ final TxnID txnID = new TxnID(response.getTxnidMostBits(),
response.getTxnidLeastBits());
+ final long requestId = response.getRequestId();
internalPinnedExecutor.execute(() -> {
OpForVoidCallBack op = (OpForVoidCallBack)
pendingRequests.remove(requestId);
if (op == null) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Got end txn response for timeout {} - {}",
txnID.getMostSigBits(),
- txnID.getLeastSigBits());
+ LOG.debug("Got end txn response for transaction but no
requests pending for txn {}", txnID);
}
return;
}
if (!hasError) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Got end txn response success for request {}",
requestId);
+ LOG.debug("Got end txn response success for request {},
txn {}", requestId, txnID);
}
op.callback.complete(null);
} else {
@@ -516,8 +514,8 @@ public class TransactionMetaStoreHandler extends
HandlerState
, op.backoff.next(), TimeUnit.MILLISECONDS);
return;
}
- LOG.error("Got {} response for request {} error {}",
BaseCommand.Type.END_TXN.name(),
- requestId, error);
+ LOG.error("Got {} response for request {}, transaction {},
error: {}",
+ BaseCommand.Type.END_TXN.name(), requestId, txnID,
error);
}
onResponse(op);
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index fdb94c17795..4d4d64f7fe9 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -1367,12 +1367,16 @@ public class Commands {
return serializeWithSize(cmd);
}
- public static ByteBuf newAddPartitionToTxnResponse(long requestId, long
txnIdMostBits, ServerError error,
- String errorMsg) {
+ public static ByteBuf newAddPartitionToTxnResponse(long requestId,
+ long txnIdLeastBits,
+ long txnIdMostBits,
+ ServerError error,
+ String errorMsg) {
BaseCommand cmd = localCmd(Type.ADD_PARTITION_TO_TXN_RESPONSE);
CommandAddPartitionToTxnResponse response =
cmd.setAddPartitionToTxnResponse()
.setRequestId(requestId)
.setError(error)
+ .setTxnidLeastBits(txnIdLeastBits)
.setTxnidMostBits(txnIdMostBits);
if (errorMsg != null) {
@@ -1401,12 +1405,13 @@ public class Commands {
return serializeWithSize(cmd);
}
- public static ByteBuf newAddSubscriptionToTxnResponse(long requestId, long
txnIdMostBits, ServerError error,
- String errorMsg) {
+ public static ByteBuf newAddSubscriptionToTxnResponse(long requestId, long
txnIdLeastBits,
+ long txnIdMostBits,
ServerError error, String errorMsg) {
BaseCommand cmd = localCmd(Type.ADD_SUBSCRIPTION_TO_TXN_RESPONSE);
CommandAddSubscriptionToTxnResponse response =
cmd.setAddSubscriptionToTxnResponse()
.setRequestId(requestId)
.setTxnidMostBits(txnIdMostBits)
+ .setTxnidLeastBits(txnIdLeastBits)
.setError(error);
if (errorMsg != null) {
response.setMessage(errorMsg);
@@ -1432,11 +1437,12 @@ public class Commands {
return cmd;
}
- public static BaseCommand newEndTxnResponse(long requestId, long
txnIdMostBits,
+ public static BaseCommand newEndTxnResponse(long requestId, long
txnIdLeastBits, long txnIdMostBits,
ServerError error, String
errorMsg) {
BaseCommand cmd = localCmd(Type.END_TXN_RESPONSE);
CommandEndTxnResponse response = cmd.setEndTxnResponse()
.setRequestId(requestId)
+ .setTxnidLeastBits(txnIdLeastBits)
.setTxnidMostBits(txnIdMostBits)
.setError(error);
if (errorMsg != null) {