This is an automated email from the ASF dual-hosted git repository.

tasanuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 42b4525f75b HDFS-17156. Client may receive old state ID which will 
lead to inconsistent reads. (#5951)
42b4525f75b is described below

commit 42b4525f75b828bf58170187f030b08622e238ab
Author: Chunyi Yang <32279893+chunyiy...@users.noreply.github.com>
AuthorDate: Fri Aug 18 01:56:34 2023 +0900

    HDFS-17156. Client may receive old state ID which will lead to inconsistent 
reads. (#5951)
    
    Reviewed-by: Simbarashe Dzinamarira <sdzinamar...@linkedin.com>
    Signed-off-by: Takanobu Asanuma <tasan...@apache.org>
---
 .../main/java/org/apache/hadoop/ipc/Client.java    |  2 +-
 .../test/java/org/apache/hadoop/ipc/TestIPC.java   | 40 +++++++++++++++++++++-
 2 files changed, 40 insertions(+), 2 deletions(-)

diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index 0fb1fd7abff..4ccb4254c71 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -1214,10 +1214,10 @@ public class Client implements AutoCloseable {
         if (status == RpcStatusProto.SUCCESS) {
           Writable value = packet.newInstance(valueClass, conf);
           final Call call = calls.remove(callId);
-          call.setRpcResponse(value);
           if (call.alignmentContext != null) {
             call.alignmentContext.receiveResponseState(header);
           }
+          call.setRpcResponse(value);
         }
         // verify that packet length was correct
         if (packet.remaining() > 0) {
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
index 394bcfc8969..7cfd65d4821 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
@@ -82,6 +82,7 @@ import org.apache.hadoop.ipc.Client.ConnectionId;
 import org.apache.hadoop.ipc.RPC.RpcKind;
 import org.apache.hadoop.ipc.Server.Call;
 import org.apache.hadoop.ipc.Server.Connection;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
 import org.apache.hadoop.net.ConnectTimeoutException;
 import org.apache.hadoop.net.NetUtils;
@@ -162,9 +163,15 @@ public class TestIPC {
   static LongWritable call(Client client, LongWritable param,
       InetSocketAddress addr, int rpcTimeout, Configuration conf)
           throws IOException {
+    return call(client, param, addr, rpcTimeout, conf, null);
+  }
+
+  static LongWritable call(Client client, LongWritable param,
+      InetSocketAddress addr, int rpcTimeout, Configuration conf, 
AlignmentContext alignmentContext)
+      throws IOException {
     final ConnectionId remoteId = getConnectionId(addr, rpcTimeout, conf);
     return (LongWritable)client.call(RPC.RpcKind.RPC_BUILTIN, param, remoteId,
-        RPC.RPC_SERVICE_CLASS_DEFAULT, null);
+        RPC.RPC_SERVICE_CLASS_DEFAULT, null, alignmentContext);
   }
 
   static class TestServer extends Server {
@@ -1330,6 +1337,37 @@ public class TestIPC {
       server.stop();
     }
   }
+
+  /**
+   * Verify that stateID is received into call before
+   * caller is notified.
+   * @throws IOException
+   */
+  @Test(timeout=60000)
+  public void testReceiveStateBeforeCallerNotification() throws IOException {
+    AtomicBoolean stateReceived = new AtomicBoolean(false);
+    AlignmentContext alignmentContext = Mockito.mock(AlignmentContext.class);
+    Mockito.doAnswer((Answer<Void>) invocation -> {
+      Thread.sleep(1000);
+      stateReceived.set(true);
+      return null;
+    }).when(alignmentContext)
+        
.receiveResponseState(any(RpcHeaderProtos.RpcResponseHeaderProto.class));
+
+    final Client client = new Client(LongWritable.class, conf);
+    final TestServer server = new TestServer(1, false);
+
+    try {
+      InetSocketAddress addr = NetUtils.getConnectAddress(server);
+      server.start();
+      call(client, new LongWritable(RANDOM.nextLong()), addr,
+          0, conf, alignmentContext);
+      Assert.assertTrue(stateReceived.get());
+    } finally {
+      client.stop();
+      server.stop();
+    }
+  }
   
   /** A dummy protocol */
   interface DummyProtocol {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to