This is an automated email from the ASF dual-hosted git repository. tasanuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 42b4525f75b HDFS-17156. Client may receive old state ID which will lead to inconsistent reads. (#5951) 42b4525f75b is described below commit 42b4525f75b828bf58170187f030b08622e238ab Author: Chunyi Yang <32279893+chunyiy...@users.noreply.github.com> AuthorDate: Fri Aug 18 01:56:34 2023 +0900 HDFS-17156. Client may receive old state ID which will lead to inconsistent reads. (#5951) Reviewed-by: Simbarashe Dzinamarira <sdzinamar...@linkedin.com> Signed-off-by: Takanobu Asanuma <tasan...@apache.org> --- .../main/java/org/apache/hadoop/ipc/Client.java | 2 +- .../test/java/org/apache/hadoop/ipc/TestIPC.java | 40 +++++++++++++++++++++- 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 0fb1fd7abff..4ccb4254c71 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -1214,10 +1214,10 @@ public class Client implements AutoCloseable { if (status == RpcStatusProto.SUCCESS) { Writable value = packet.newInstance(valueClass, conf); final Call call = calls.remove(callId); - call.setRpcResponse(value); if (call.alignmentContext != null) { call.alignmentContext.receiveResponseState(header); } + call.setRpcResponse(value); } // verify that packet length was correct if (packet.remaining() > 0) { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java index 394bcfc8969..7cfd65d4821 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java @@ -82,6 +82,7 @@ import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.RPC.RpcKind; import org.apache.hadoop.ipc.Server.Call; import org.apache.hadoop.ipc.Server.Connection; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; import org.apache.hadoop.net.ConnectTimeoutException; import org.apache.hadoop.net.NetUtils; @@ -162,9 +163,15 @@ public class TestIPC { static LongWritable call(Client client, LongWritable param, InetSocketAddress addr, int rpcTimeout, Configuration conf) throws IOException { + return call(client, param, addr, rpcTimeout, conf, null); + } + + static LongWritable call(Client client, LongWritable param, + InetSocketAddress addr, int rpcTimeout, Configuration conf, AlignmentContext alignmentContext) + throws IOException { final ConnectionId remoteId = getConnectionId(addr, rpcTimeout, conf); return (LongWritable)client.call(RPC.RpcKind.RPC_BUILTIN, param, remoteId, - RPC.RPC_SERVICE_CLASS_DEFAULT, null); + RPC.RPC_SERVICE_CLASS_DEFAULT, null, alignmentContext); } static class TestServer extends Server { @@ -1330,6 +1337,37 @@ public class TestIPC { server.stop(); } } + + /** + * Verify that stateID is received into call before + * caller is notified. + * @throws IOException + */ + @Test(timeout=60000) + public void testReceiveStateBeforeCallerNotification() throws IOException { + AtomicBoolean stateReceived = new AtomicBoolean(false); + AlignmentContext alignmentContext = Mockito.mock(AlignmentContext.class); + Mockito.doAnswer((Answer<Void>) invocation -> { + Thread.sleep(1000); + stateReceived.set(true); + return null; + }).when(alignmentContext) + .receiveResponseState(any(RpcHeaderProtos.RpcResponseHeaderProto.class)); + + final Client client = new Client(LongWritable.class, conf); + final TestServer server = new TestServer(1, false); + + try { + InetSocketAddress addr = NetUtils.getConnectAddress(server); + server.start(); + call(client, new LongWritable(RANDOM.nextLong()), addr, + 0, conf, alignmentContext); + Assert.assertTrue(stateReceived.get()); + } finally { + client.stop(); + server.stop(); + } + } /** A dummy protocol */ interface DummyProtocol { --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org