This is an automated email from the ASF dual-hosted git repository.

zhaocong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new f023db7b3ed [improve][txn] Handle changeToReadyState failure correctly 
in TC client (#19308)
f023db7b3ed is described below

commit f023db7b3ed95c4638f6ed4dcdbb48cc1f8428dc
Author: Yunze Xu <[email protected]>
AuthorDate: Mon Nov 21 11:42:43 2022 +0800

    [improve][txn] Handle changeToReadyState failure correctly in TC client 
(#19308)
    
    (cherry picked from commit 644be5fdd6d080accffd72229b2e21e35a27d722)
---
 .../client/impl/TransactionMetaStoreHandler.java   | 37 ++++++------
 .../impl/TransactionMetaStoreHandlerTest.java      | 66 ++++++++++++++++++++++
 2 files changed, 85 insertions(+), 18 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index 73df89dd6ae..a26b8ee41dd 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -140,18 +140,10 @@ public class TransactionMetaStoreHandler extends 
HandlerState implements Connect
                 cnx.sendRequestWithId(request, requestId).thenRun(() -> {
                     internalPinnedExecutor.execute(() -> {
                         LOG.info("Transaction coordinator client connect 
success! tcId : {}", transactionCoordinatorId);
-                        if (!changeToReadyState()) {
-                            setState(State.Closed);
-                            cnx.channel().close();
-                        }
-
-                        connectionHandler.setClientCnx(cnx);
-                        
cnx.registerTransactionMetaStoreHandler(transactionCoordinatorId, this);
-                        if (!this.connectFuture.isDone()) {
-                            this.connectFuture.complete(null);
+                        if (registerToConnection(cnx)) {
+                            this.connectionHandler.resetBackoff();
+                            pendingRequests.forEach((requestID, opBase) -> 
checkStateAndSendRequest(opBase));
                         }
-                        this.connectionHandler.resetBackoff();
-                        pendingRequests.forEach((requestID, opBase) -> 
checkStateAndSendRequest(opBase));
                     });
                 }).exceptionally((e) -> {
                     internalPinnedExecutor.execute(() -> {
@@ -168,17 +160,26 @@ public class TransactionMetaStoreHandler extends 
HandlerState implements Connect
                     return null;
                 });
             } else {
-                if (!changeToReadyState()) {
-                    cnx.channel().close();
-                } else {
-                    connectionHandler.setClientCnx(cnx);
-                    
cnx.registerTransactionMetaStoreHandler(transactionCoordinatorId, this);
-                }
-                this.connectFuture.complete(null);
+                registerToConnection(cnx);
             }
         });
     }
 
+    private boolean registerToConnection(ClientCnx cnx) {
+        if (changeToReadyState()) {
+            connectionHandler.setClientCnx(cnx);
+            cnx.registerTransactionMetaStoreHandler(transactionCoordinatorId, 
this);
+            connectFuture.complete(null);
+            return true;
+        } else {
+            State state = getState();
+            cnx.channel().close();
+            connectFuture.completeExceptionally(
+                    new IllegalStateException("Failed to change the state from 
" + state + " to Ready"));
+            return false;
+        }
+    }
+
     private void failPendingRequest() {
         // this method is executed in internalPinnedExecutor.
         pendingRequests.forEach((k, op) -> {
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandlerTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandlerTest.java
new file mode 100644
index 00000000000..d7d0a2be00b
--- /dev/null
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandlerTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import io.netty.channel.Channel;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+public class TransactionMetaStoreHandlerTest {
+
+    @Test
+    public void testStateChangeFailure() throws Exception {
+        final PulsarClientImpl client = (PulsarClientImpl) 
PulsarClient.builder()
+                .serviceUrl("pulsar://localhost:6650").build();
+        final CompletableFuture<Void> connectFuture = new 
CompletableFuture<>();
+        final TransactionMetaStoreHandler handler = new 
TransactionMetaStoreHandler(
+                0L, client, "topic", connectFuture);
+        final ClientCnx cnx = mock(ClientCnx.class);
+        when(cnx.getRemoteEndpointProtocolVersion()).thenReturn(19);
+        final CompletableFuture<ProducerResponse> responseFuture = 
CompletableFuture.completedFuture(null);
+        when(cnx.sendRequestWithId(any(), 
anyLong())).thenReturn(responseFuture);
+
+        final Channel channel = mock(Channel.class);
+        when(cnx.channel()).thenReturn(channel);
+
+        // Set an invalid state so that the state change will fail
+        handler.setState(HandlerState.State.Terminated);
+        handler.connectionOpened(cnx);
+
+        
Awaitility.await().atMost(Duration.ofSeconds(3)).until(connectFuture::isDone);
+
+        assertTrue(connectFuture.isCompletedExceptionally());
+        assertEquals(handler.getState(), HandlerState.State.Terminated);
+        verify(cnx, times(0)).registerTransactionMetaStoreHandler(anyLong(), 
any());
+
+        client.close();
+    }
+}

Reply via email to