This is an automated email from the ASF dual-hosted git repository.
zhaocong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new f023db7b3ed [improve][txn] Handle changeToReadyState failure correctly
in TC client (#19308)
f023db7b3ed is described below
commit f023db7b3ed95c4638f6ed4dcdbb48cc1f8428dc
Author: Yunze Xu <[email protected]>
AuthorDate: Mon Nov 21 11:42:43 2022 +0800
[improve][txn] Handle changeToReadyState failure correctly in TC client
(#19308)
(cherry picked from commit 644be5fdd6d080accffd72229b2e21e35a27d722)
---
.../client/impl/TransactionMetaStoreHandler.java | 37 ++++++------
.../impl/TransactionMetaStoreHandlerTest.java | 66 ++++++++++++++++++++++
2 files changed, 85 insertions(+), 18 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index 73df89dd6ae..a26b8ee41dd 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -140,18 +140,10 @@ public class TransactionMetaStoreHandler extends
HandlerState implements Connect
cnx.sendRequestWithId(request, requestId).thenRun(() -> {
internalPinnedExecutor.execute(() -> {
LOG.info("Transaction coordinator client connect
success! tcId : {}", transactionCoordinatorId);
- if (!changeToReadyState()) {
- setState(State.Closed);
- cnx.channel().close();
- }
-
- connectionHandler.setClientCnx(cnx);
-
cnx.registerTransactionMetaStoreHandler(transactionCoordinatorId, this);
- if (!this.connectFuture.isDone()) {
- this.connectFuture.complete(null);
+ if (registerToConnection(cnx)) {
+ this.connectionHandler.resetBackoff();
+ pendingRequests.forEach((requestID, opBase) ->
checkStateAndSendRequest(opBase));
}
- this.connectionHandler.resetBackoff();
- pendingRequests.forEach((requestID, opBase) ->
checkStateAndSendRequest(opBase));
});
}).exceptionally((e) -> {
internalPinnedExecutor.execute(() -> {
@@ -168,17 +160,26 @@ public class TransactionMetaStoreHandler extends
HandlerState implements Connect
return null;
});
} else {
- if (!changeToReadyState()) {
- cnx.channel().close();
- } else {
- connectionHandler.setClientCnx(cnx);
-
cnx.registerTransactionMetaStoreHandler(transactionCoordinatorId, this);
- }
- this.connectFuture.complete(null);
+ registerToConnection(cnx);
}
});
}
+ private boolean registerToConnection(ClientCnx cnx) {
+ if (changeToReadyState()) {
+ connectionHandler.setClientCnx(cnx);
+ cnx.registerTransactionMetaStoreHandler(transactionCoordinatorId,
this);
+ connectFuture.complete(null);
+ return true;
+ } else {
+ State state = getState();
+ cnx.channel().close();
+ connectFuture.completeExceptionally(
+ new IllegalStateException("Failed to change the state from
" + state + " to Ready"));
+ return false;
+ }
+ }
+
private void failPendingRequest() {
// this method is executed in internalPinnedExecutor.
pendingRequests.forEach((k, op) -> {
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandlerTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandlerTest.java
new file mode 100644
index 00000000000..d7d0a2be00b
--- /dev/null
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandlerTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import io.netty.channel.Channel;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+public class TransactionMetaStoreHandlerTest {
+
+ @Test
+ public void testStateChangeFailure() throws Exception {
+ final PulsarClientImpl client = (PulsarClientImpl)
PulsarClient.builder()
+ .serviceUrl("pulsar://localhost:6650").build();
+ final CompletableFuture<Void> connectFuture = new
CompletableFuture<>();
+ final TransactionMetaStoreHandler handler = new
TransactionMetaStoreHandler(
+ 0L, client, "topic", connectFuture);
+ final ClientCnx cnx = mock(ClientCnx.class);
+ when(cnx.getRemoteEndpointProtocolVersion()).thenReturn(19);
+ final CompletableFuture<ProducerResponse> responseFuture =
CompletableFuture.completedFuture(null);
+ when(cnx.sendRequestWithId(any(),
anyLong())).thenReturn(responseFuture);
+
+ final Channel channel = mock(Channel.class);
+ when(cnx.channel()).thenReturn(channel);
+
+ // Set an invalid state so that the state change will fail
+ handler.setState(HandlerState.State.Terminated);
+ handler.connectionOpened(cnx);
+
+
Awaitility.await().atMost(Duration.ofSeconds(3)).until(connectFuture::isDone);
+
+ assertTrue(connectFuture.isCompletedExceptionally());
+ assertEquals(handler.getState(), HandlerState.State.Terminated);
+ verify(cnx, times(0)).registerTransactionMetaStoreHandler(anyLong(),
any());
+
+ client.close();
+ }
+}