This is an automated email from the ASF dual-hosted git repository.
tasanuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 42b4525f75b HDFS-17156. Client may receive old state ID which will
lead to inconsistent reads. (#5951)
42b4525f75b is described below
commit 42b4525f75b828bf58170187f030b08622e238ab
Author: Chunyi Yang <[email protected]>
AuthorDate: Fri Aug 18 01:56:34 2023 +0900
HDFS-17156. Client may receive old state ID which will lead to inconsistent
reads. (#5951)
Reviewed-by: Simbarashe Dzinamarira <[email protected]>
Signed-off-by: Takanobu Asanuma <[email protected]>
---
.../main/java/org/apache/hadoop/ipc/Client.java | 2 +-
.../test/java/org/apache/hadoop/ipc/TestIPC.java | 40 +++++++++++++++++++++-
2 files changed, 40 insertions(+), 2 deletions(-)
diff --git
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index 0fb1fd7abff..4ccb4254c71 100644
---
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -1214,10 +1214,10 @@ public class Client implements AutoCloseable {
if (status == RpcStatusProto.SUCCESS) {
Writable value = packet.newInstance(valueClass, conf);
final Call call = calls.remove(callId);
- call.setRpcResponse(value);
if (call.alignmentContext != null) {
call.alignmentContext.receiveResponseState(header);
}
+ call.setRpcResponse(value);
}
// verify that packet length was correct
if (packet.remaining() > 0) {
diff --git
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
index 394bcfc8969..7cfd65d4821 100644
---
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
+++
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
@@ -82,6 +82,7 @@ import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.RPC.RpcKind;
import org.apache.hadoop.ipc.Server.Call;
import org.apache.hadoop.ipc.Server.Connection;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
import org.apache.hadoop.net.ConnectTimeoutException;
import org.apache.hadoop.net.NetUtils;
@@ -162,9 +163,15 @@ public class TestIPC {
static LongWritable call(Client client, LongWritable param,
InetSocketAddress addr, int rpcTimeout, Configuration conf)
throws IOException {
+ return call(client, param, addr, rpcTimeout, conf, null);
+ }
+
+ static LongWritable call(Client client, LongWritable param,
+ InetSocketAddress addr, int rpcTimeout, Configuration conf,
AlignmentContext alignmentContext)
+ throws IOException {
final ConnectionId remoteId = getConnectionId(addr, rpcTimeout, conf);
return (LongWritable)client.call(RPC.RpcKind.RPC_BUILTIN, param, remoteId,
- RPC.RPC_SERVICE_CLASS_DEFAULT, null);
+ RPC.RPC_SERVICE_CLASS_DEFAULT, null, alignmentContext);
}
static class TestServer extends Server {
@@ -1330,6 +1337,37 @@ public class TestIPC {
server.stop();
}
}
+
+ /**
+ * Verify that stateID is received into call before
+ * caller is notified.
+ * @throws IOException
+ */
+ @Test(timeout=60000)
+ public void testReceiveStateBeforeCallerNotification() throws IOException {
+ AtomicBoolean stateReceived = new AtomicBoolean(false);
+ AlignmentContext alignmentContext = Mockito.mock(AlignmentContext.class);
+ Mockito.doAnswer((Answer<Void>) invocation -> {
+ Thread.sleep(1000);
+ stateReceived.set(true);
+ return null;
+ }).when(alignmentContext)
+
.receiveResponseState(any(RpcHeaderProtos.RpcResponseHeaderProto.class));
+
+ final Client client = new Client(LongWritable.class, conf);
+ final TestServer server = new TestServer(1, false);
+
+ try {
+ InetSocketAddress addr = NetUtils.getConnectAddress(server);
+ server.start();
+ call(client, new LongWritable(RANDOM.nextLong()), addr,
+ 0, conf, alignmentContext);
+ Assert.assertTrue(stateReceived.get());
+ } finally {
+ client.stop();
+ server.stop();
+ }
+ }
/** A dummy protocol */
interface DummyProtocol {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]