eolivelli commented on a change in pull request #11304:
URL: https://github.com/apache/pulsar/pull/11304#discussion_r685052574
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -1998,43 +1998,68 @@ protected void
handleEndTxnOnPartition(CommandEndTxnOnPartition command) {
final String topic = command.getTopic();
final int txnAction = command.getTxnAction().getValue();
TxnID txnID = new TxnID(command.getTxnidMostBits(),
command.getTxnidLeastBits());
+ final long lowWaterMark = command.getTxnidLeastBitsOfLowWatermark();
if (log.isDebugEnabled()) {
log.debug("[{}] handleEndTxnOnPartition txnId: [{}], txnAction:
[{}]", topic,
txnID, txnAction);
}
- CompletableFuture<Optional<Topic>> topicFuture =
service.getTopics().get(TopicName.get(topic).toString());
- if (topicFuture != null) {
- topicFuture.whenComplete((optionalTopic, t) -> {
- if (!optionalTopic.isPresent()) {
- log.error("handleEndTxnOnPartition fail ! The topic {}
does not exist in broker, "
- + "txnId: [{}], txnAction: [{}]", topic, txnID,
TxnAction.valueOf(txnAction));
- ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
- requestId, ServerError.ServiceNotReady,
- "Topic " + topic + " is not found."));
- return;
- }
- optionalTopic.get().endTxn(txnID, txnAction,
command.getTxnidLeastBitsOfLowWatermark())
+ CompletableFuture<Optional<Topic>> topicFuture =
service.getTopicIfExists(TopicName.get(topic).toString());
+ topicFuture.thenAccept(optionalTopic -> {
+ if (optionalTopic.isPresent()) {
+ optionalTopic.get().endTxn(txnID, txnAction, lowWaterMark)
.whenComplete((ignored, throwable) -> {
if (throwable != null) {
- log.error("Handle endTxnOnPartition {}
failed.", topic, throwable);
+ log.error("handleEndTxnOnPartition fail!,
topic {}, txnId: [{}], "
+ + "txnAction: [{}]", topic, txnID,
TxnAction.valueOf(txnAction), throwable);
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
requestId,
BrokerServiceException.getClientErrorCode(throwable),
- throwable.getMessage()));
+ throwable.getMessage(),
+ txnID.getLeastSigBits(),
txnID.getMostSigBits()));
return;
}
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId,
txnID.getLeastSigBits(),
txnID.getMostSigBits()));
});
- });
- } else {
- log.error("handleEndTxnOnPartition faile ! The topic {} does not
exist in broker, "
- + "txnId: [{}], txnAction: [{}]", topic, txnID,
TxnAction.valueOf(txnAction));
- ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
- requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(),
- ServerError.ServiceNotReady,
- "The topic " + topic + " is not exist in broker."));
- }
+
+ } else {
+ getBrokerService().getManagedLedgerFactory()
+
.asyncExists(TopicName.get(topic).getPersistenceNamingEncoding())
+ .thenAccept((b) -> {
+ if (b) {
+ log.error("handleEndTxnOnPartition fail ! The
topic {} does not exist in broker, "
+ + "txnId: [{}], txnAction:
[{}]", topic,
+ txnID, TxnAction.valueOf(txnAction));
+
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId,
+ ServerError.ServiceNotReady,
+ "The topic " + topic + " is not exist
in broker.",
Review comment:
there is a typo, what about: "The topic " + topic + " does not exist in
broker."
##########
File path:
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
##########
@@ -74,76 +65,44 @@
import static org.testng.Assert.fail;
@Test(groups = "broker")
-public class TransactionBufferClientTest extends TransactionMetaStoreTestBase {
+public class TransactionBufferClientTest extends TransactionTestBase {
private static final Logger log =
LoggerFactory.getLogger(TransactionBufferClientTest.class);
private TransactionBufferClient tbClient;
TopicName partitionedTopicName = TopicName.get("persistent", "public",
"test", "tb-client");
int partitions = 10;
- BrokerService[] brokerServices;
private static final String namespace = "public/test";
- private EventLoopGroup eventLoopGroup;
-
@Override
- protected void afterSetup() throws Exception {
- pulsarAdmins[0].clusters().createCluster("my-cluster",
ClusterData.builder().serviceUrl(pulsarServices[0].getWebServiceAddress()).build());
- pulsarAdmins[0].tenants().createTenant("public", new
TenantInfoImpl(Sets.newHashSet(), Sets.newHashSet("my-cluster")));
- pulsarAdmins[0].namespaces().createNamespace(namespace, 10);
-
pulsarAdmins[0].topics().createPartitionedTopic(partitionedTopicName.getPartitionedTopicName(),
partitions);
- String subName = "test";
-
pulsarAdmins[0].topics().createSubscription(partitionedTopicName.getPartitionedTopicName(),
subName, MessageId.latest);
+ @BeforeClass(alwaysRun = true)
+ protected void setup() throws Exception {
+ setBrokerCount(3);
+ internalSetup();
+ String[] brokerServiceUrlArr =
getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
+ String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length
-1];
+ admin.clusters().createCluster(CLUSTER_NAME,
ClusterData.builder().serviceUrl("http://localhost:" + webServicePort).build());
+
+
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
+ new TenantInfoImpl(Sets.newHashSet("appid1"),
Sets.newHashSet(CLUSTER_NAME)));
+
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
+
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
16);
Review comment:
do we need 16 partitions for this test ?
is there an utility method to setup the system topics related to
Transactions ?
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -2045,70 +2070,82 @@ protected void
handleEndTxnOnSubscription(CommandEndTxnOnSubscription command) {
final String topic = command.getSubscription().getTopic();
final String subName = command.getSubscription().getSubscription();
final int txnAction = command.getTxnAction().getValue();
+ final TxnID txnID = new TxnID(txnidMostBits, txnidLeastBits);
+ final long lowWaterMark = command.getTxnidLeastBitsOfLowWatermark();
if (log.isDebugEnabled()) {
- log.debug("[{}] handleEndTxnOnSubscription txnId: [{}], txnAction:
[{}]", topic,
+ log.debug("[{}] [{}] handleEndTxnOnSubscription txnId: [{}],
txnAction: [{}]", topic, subName,
new TxnID(txnidMostBits, txnidLeastBits), txnAction);
}
- CompletableFuture<Optional<Topic>> topicFuture =
service.getTopics().get(TopicName.get(topic).toString());
- if (topicFuture != null) {
- topicFuture.thenAccept(optionalTopic -> {
-
- if (!optionalTopic.isPresent()) {
- log.error("handleEndTxnOnSubscription fail! The topic {}
does not exist in broker, txnId: "
- + "[{}], txnAction: [{}]", topic,
- new TxnID(txnidMostBits, txnidLeastBits),
TxnAction.valueOf(txnAction));
- ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
- requestId, txnidLeastBits, txnidMostBits,
- ServerError.ServiceNotReady,
- "The topic " + topic + " is not exist in
broker."));
- return;
- }
-
+ CompletableFuture<Optional<Topic>> topicFuture =
service.getTopicIfExists(TopicName.get(topic).toString());
+ topicFuture.thenAccept(optionalTopic -> {
+ if (optionalTopic.isPresent()) {
Subscription subscription =
optionalTopic.get().getSubscription(subName);
if (subscription == null) {
- log.error("Topic {} subscription {} is not exist.",
optionalTopic.get().getName(), subName);
- ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
- requestId, txnidLeastBits, txnidMostBits,
- ServerError.ServiceNotReady,
- "Topic " + optionalTopic.get().getName()
- + " subscription " + subName + " is not
exist."));
+ log.warn("handleEndTxnOnSubscription fail! "
+ + "topic {} subscription {} is not exist.
txnId: [{}], txnAction: [{}]",
Review comment:
"is not exist" -> "does not exist"
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -1998,43 +1998,68 @@ protected void
handleEndTxnOnPartition(CommandEndTxnOnPartition command) {
final String topic = command.getTopic();
final int txnAction = command.getTxnAction().getValue();
TxnID txnID = new TxnID(command.getTxnidMostBits(),
command.getTxnidLeastBits());
+ final long lowWaterMark = command.getTxnidLeastBitsOfLowWatermark();
if (log.isDebugEnabled()) {
log.debug("[{}] handleEndTxnOnPartition txnId: [{}], txnAction:
[{}]", topic,
txnID, txnAction);
}
- CompletableFuture<Optional<Topic>> topicFuture =
service.getTopics().get(TopicName.get(topic).toString());
- if (topicFuture != null) {
- topicFuture.whenComplete((optionalTopic, t) -> {
- if (!optionalTopic.isPresent()) {
- log.error("handleEndTxnOnPartition fail ! The topic {}
does not exist in broker, "
- + "txnId: [{}], txnAction: [{}]", topic, txnID,
TxnAction.valueOf(txnAction));
- ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
- requestId, ServerError.ServiceNotReady,
- "Topic " + topic + " is not found."));
- return;
- }
- optionalTopic.get().endTxn(txnID, txnAction,
command.getTxnidLeastBitsOfLowWatermark())
+ CompletableFuture<Optional<Topic>> topicFuture =
service.getTopicIfExists(TopicName.get(topic).toString());
+ topicFuture.thenAccept(optionalTopic -> {
+ if (optionalTopic.isPresent()) {
+ optionalTopic.get().endTxn(txnID, txnAction, lowWaterMark)
.whenComplete((ignored, throwable) -> {
if (throwable != null) {
- log.error("Handle endTxnOnPartition {}
failed.", topic, throwable);
+ log.error("handleEndTxnOnPartition fail!,
topic {}, txnId: [{}], "
+ + "txnAction: [{}]", topic, txnID,
TxnAction.valueOf(txnAction), throwable);
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
requestId,
BrokerServiceException.getClientErrorCode(throwable),
- throwable.getMessage()));
+ throwable.getMessage(),
+ txnID.getLeastSigBits(),
txnID.getMostSigBits()));
return;
}
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId,
txnID.getLeastSigBits(),
txnID.getMostSigBits()));
});
- });
- } else {
- log.error("handleEndTxnOnPartition faile ! The topic {} does not
exist in broker, "
- + "txnId: [{}], txnAction: [{}]", topic, txnID,
TxnAction.valueOf(txnAction));
- ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
- requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(),
- ServerError.ServiceNotReady,
- "The topic " + topic + " is not exist in broker."));
- }
+
+ } else {
+ getBrokerService().getManagedLedgerFactory()
+
.asyncExists(TopicName.get(topic).getPersistenceNamingEncoding())
+ .thenAccept((b) -> {
+ if (b) {
+ log.error("handleEndTxnOnPartition fail ! The
topic {} does not exist in broker, "
+ + "txnId: [{}], txnAction:
[{}]", topic,
+ txnID, TxnAction.valueOf(txnAction));
+
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId,
+ ServerError.ServiceNotReady,
+ "The topic " + topic + " is not exist
in broker.",
+ txnID.getMostSigBits(),
txnID.getLeastSigBits()));
+ } else {
+ log.warn("handleEndTxnOnPartition fail ! The
topic {} have not been created, "
Review comment:
typo: "has not been created"
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]