HDFS-12977. [SBN read] Add stateId to RPC headers. Contributed by Plamen 
Jeliazkov.

Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c9d73437
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c9d73437
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c9d73437

Branch: refs/heads/HDFS-12943
Commit: c9d73437e881296282abe46a6b3196befc8cef72
Parents: 3a78e5f
Author: Plamen Jeliazkov <plamenj2...@gmail.com>
Authored: Tue Mar 20 18:48:40 2018 -0700
Committer: Konstantin V Shvachko <s...@apache.org>
Committed: Mon Dec 24 09:33:59 2018 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/ipc/AlignmentContext.java |  51 ++++++++
 .../main/java/org/apache/hadoop/ipc/Client.java |   9 ++
 .../apache/hadoop/ipc/ProtobufRpcEngine.java    |   9 +-
 .../main/java/org/apache/hadoop/ipc/RPC.java    |  10 +-
 .../java/org/apache/hadoop/ipc/RpcEngine.java   |   5 +-
 .../main/java/org/apache/hadoop/ipc/Server.java |  15 ++-
 .../apache/hadoop/ipc/WritableRpcEngine.java    |  33 ++++-
 .../src/main/proto/RpcHeader.proto              |   1 +
 .../java/org/apache/hadoop/ipc/TestRPC.java     |   3 +-
 .../apache/hadoop/hdfs/ClientGCIContext.java    |  65 +++++++++
 .../java/org/apache/hadoop/hdfs/DFSClient.java  |   3 +
 .../server/namenode/GlobalStateIdContext.java   |  59 +++++++++
 .../hdfs/server/namenode/NameNodeRpcServer.java |   1 +
 .../hadoop/hdfs/TestStateAlignmentContext.java  | 131 +++++++++++++++++++
 14 files changed, 383 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9d73437/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java
new file mode 100644
index 0000000..f952325
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
+
+/**
+ * This interface intends to align the state between client and server
+ * via RPC communication.
+ *
+ * This should be implemented separately on the client side and server side
+ * and can be used to pass state information on RPC responses from server
+ * to client.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+public interface AlignmentContext {
+
+  /**
+   * This is the intended server method call to implement to pass state info
+   * during RPC response header construction.
+   * @param header The RPC response header builder.
+   */
+  void updateResponseState(RpcResponseHeaderProto.Builder header);
+
+  /**
+   * This is the intended client method call to implement to recieve state info
+   * during RPC response processing.
+   * @param header The RPC response header.
+   */
+  void receiveResponseState(RpcResponseHeaderProto header);
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9d73437/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index 49f1e49..a33aa1c 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -103,6 +103,12 @@ public class Client implements AutoCloseable {
           return false;
         }
       };
+  private static AlignmentContext alignmentContext;
+
+  /** Set alignment context to use to fetch state alignment info from RPC. */
+  public static void setAlignmentContext(AlignmentContext ac) {
+    alignmentContext = ac;
+  }
 
   @SuppressWarnings("unchecked")
   @Unstable
@@ -1186,6 +1192,9 @@ public class Client implements AutoCloseable {
           final Call call = calls.remove(callId);
           call.setRpcResponse(value);
         }
+        if (alignmentContext != null) {
+          alignmentContext.receiveResponseState(header);
+        }
         // verify that packet length was correct
         if (packet.remaining() > 0) {
           throw new RpcClientException("RPC response length mismatch");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9d73437/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
index 70fde60..2734a95 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
@@ -337,11 +337,11 @@ public class ProtobufRpcEngine implements RpcEngine {
       String bindAddress, int port, int numHandlers, int numReaders,
       int queueSizePerHandler, boolean verbose, Configuration conf,
       SecretManager<? extends TokenIdentifier> secretManager,
-      String portRangeConfig)
+      String portRangeConfig, AlignmentContext alignmentContext)
       throws IOException {
     return new Server(protocol, protocolImpl, conf, bindAddress, port,
         numHandlers, numReaders, queueSizePerHandler, verbose, secretManager,
-        portRangeConfig);
+        portRangeConfig, alignmentContext);
   }
   
   public static class Server extends RPC.Server {
@@ -410,18 +410,19 @@ public class ProtobufRpcEngine implements RpcEngine {
      * @param numHandlers the number of method handler threads to run
      * @param verbose whether each call should be logged
      * @param portRangeConfig A config parameter that can be used to restrict
-     * the range of ports used when port is 0 (an ephemeral port)
+     * @param alignmentContext provides server state info on client responses
      */
     public Server(Class<?> protocolClass, Object protocolImpl,
         Configuration conf, String bindAddress, int port, int numHandlers,
         int numReaders, int queueSizePerHandler, boolean verbose,
         SecretManager<? extends TokenIdentifier> secretManager, 
-        String portRangeConfig)
+        String portRangeConfig, AlignmentContext alignmentContext)
         throws IOException {
       super(bindAddress, port, null, numHandlers,
           numReaders, queueSizePerHandler, conf,
           serverNameFromClass(protocolImpl.getClass()), secretManager,
           portRangeConfig);
+      setAlignmentContext(alignmentContext);
       this.verbose = verbose;  
       registerProtocolAndImpl(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocolClass,
           protocolImpl);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9d73437/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
index 9cfadc7..36d5400 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
@@ -719,6 +719,7 @@ public class RPC {
     private final Configuration conf;    
     private SecretManager<? extends TokenIdentifier> secretManager = null;
     private String portRangeConfig = null;
+    private AlignmentContext alignmentContext = null;
     
     public Builder(Configuration conf) {
       this.conf = conf;
@@ -785,6 +786,12 @@ public class RPC {
       return this;
     }
     
+    /** Default: null */
+    public Builder setAlignmentContext(AlignmentContext alignmentContext) {
+      this.alignmentContext = alignmentContext;
+      return this;
+    }
+
     /**
      * Build the RPC Server. 
      * @throws IOException on error
@@ -804,7 +811,8 @@ public class RPC {
       return getProtocolEngine(this.protocol, this.conf).getServer(
           this.protocol, this.instance, this.bindAddress, this.port,
           this.numHandlers, this.numReaders, this.queueSizePerHandler,
-          this.verbose, this.conf, this.secretManager, this.portRangeConfig);
+          this.verbose, this.conf, this.secretManager, this.portRangeConfig,
+          this.alignmentContext);
     }
   }
   

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9d73437/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java
index 047722e..8a43172 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java
@@ -67,6 +67,7 @@ public interface RpcEngine {
    * @param secretManager The secret manager to use to validate incoming 
requests.
    * @param portRangeConfig A config parameter that can be used to restrict
    *        the range of ports used when port is 0 (an ephemeral port)
+   * @param alignmentContext provides server state info on client responses
    * @return The Server instance
    * @throws IOException on any error
    */
@@ -75,8 +76,8 @@ public interface RpcEngine {
                        int queueSizePerHandler, boolean verbose,
                        Configuration conf, 
                        SecretManager<? extends TokenIdentifier> secretManager,
-                       String portRangeConfig
-                       ) throws IOException;
+                       String portRangeConfig,
+                       AlignmentContext alignmentContext) throws IOException;
 
   /**
    * Returns a proxy for ProtocolMetaInfoPB, which uses the given connection

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9d73437/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index b0ab85c..7705874 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -140,11 +140,12 @@ public abstract class Server {
   private RpcSaslProto negotiateResponse;
   private ExceptionsHandler exceptionsHandler = new ExceptionsHandler();
   private Tracer tracer;
+  private AlignmentContext alignmentContext;
   /**
    * Logical name of the server used in metrics and monitor.
    */
   private final String serverName;
-  
+
   /**
    * Add exception classes for which server won't log stack traces.
    *
@@ -164,6 +165,15 @@ public abstract class Server {
   }
 
   /**
+   * Set alignment context to pass state info thru RPC.
+   *
+   * @param alignmentContext alignment state context
+   */
+  public void setAlignmentContext(AlignmentContext alignmentContext) {
+    this.alignmentContext = alignmentContext;
+  }
+
+  /**
    * ExceptionsHandler manages Exception groups for special handling
    * e.g., terse exception group for concise logging messages
    */
@@ -2977,6 +2987,9 @@ public abstract class Server {
     headerBuilder.setRetryCount(call.retryCount);
     headerBuilder.setStatus(status);
     headerBuilder.setServerIpcVersionNum(CURRENT_VERSION);
+    if(alignmentContext != null) {
+      alignmentContext.updateResponseState(headerBuilder);
+    }
 
     if (status == RpcStatusProto.SUCCESS) {
       RpcResponseHeaderProto header = headerBuilder.build();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9d73437/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
index 0497931..507517b 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
@@ -323,11 +323,11 @@ public class WritableRpcEngine implements RpcEngine {
                       int numHandlers, int numReaders, int queueSizePerHandler,
                       boolean verbose, Configuration conf,
                       SecretManager<? extends TokenIdentifier> secretManager,
-                      String portRangeConfig) 
+                      String portRangeConfig, AlignmentContext 
alignmentContext)
     throws IOException {
     return new Server(protocolClass, protocolImpl, conf, bindAddress, port,
         numHandlers, numReaders, queueSizePerHandler, verbose, secretManager,
-        portRangeConfig);
+        portRangeConfig, alignmentContext);
   }
 
 
@@ -397,18 +397,45 @@ public class WritableRpcEngine implements RpcEngine {
      * @param port the port to listen for connections on
      * @param numHandlers the number of method handler threads to run
      * @param verbose whether each call should be logged
+     *
+     * @deprecated use Server#Server(Class, Object,
+     *      Configuration, String, int, int, int, int, boolean, SecretManager)
      */
+    @Deprecated
     public Server(Class<?> protocolClass, Object protocolImpl,
         Configuration conf, String bindAddress,  int port,
         int numHandlers, int numReaders, int queueSizePerHandler, 
         boolean verbose, SecretManager<? extends TokenIdentifier> 
secretManager,
         String portRangeConfig) 
         throws IOException {
+      this(null, protocolImpl,  conf,  bindAddress,   port,
+          numHandlers,  numReaders,  queueSizePerHandler,  verbose,
+          secretManager, null, null);
+    }
+
+    /**
+     * Construct an RPC server.
+     * @param protocolClass - the protocol being registered
+     *     can be null for compatibility with old usage (see below for details)
+     * @param protocolImpl the protocol impl that will be called
+     * @param conf the configuration to use
+     * @param bindAddress the address to bind on to listen for connection
+     * @param port the port to listen for connections on
+     * @param numHandlers the number of method handler threads to run
+     * @param verbose whether each call should be logged
+     * @param alignmentContext provides server state info on client responses
+     */
+    public Server(Class<?> protocolClass, Object protocolImpl,
+        Configuration conf, String bindAddress,  int port,
+        int numHandlers, int numReaders, int queueSizePerHandler,
+        boolean verbose, SecretManager<? extends TokenIdentifier> 
secretManager,
+        String portRangeConfig, AlignmentContext alignmentContext)
+        throws IOException {
       super(bindAddress, port, null, numHandlers, numReaders,
           queueSizePerHandler, conf,
           serverNameFromClass(protocolImpl.getClass()), secretManager,
           portRangeConfig);
-
+      setAlignmentContext(alignmentContext);
       this.verbose = verbose;
       
       

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9d73437/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto 
b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto
index aa14616..bfe1301 100644
--- a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto
+++ b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto
@@ -155,6 +155,7 @@ message RpcResponseHeaderProto {
   optional RpcErrorCodeProto errorDetail = 6; // in case of error
   optional bytes clientId = 7; // Globally unique client ID
   optional sint32 retryCount = 8 [default = -1];
+  optional int64 stateId = 9; // The last written Global State ID
 }
 
 message RpcSaslProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9d73437/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
index c99b403..f8f41ba 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
@@ -299,7 +299,8 @@ public class TestRPC extends TestRpcBase {
         int numHandlers, int numReaders, int queueSizePerHandler,
         boolean verbose, Configuration conf,
         SecretManager<? extends TokenIdentifier> secretManager,
-        String portRangeConfig) throws IOException {
+        String portRangeConfig, AlignmentContext alignmentContext)
+        throws IOException {
       return null;
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9d73437/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGCIContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGCIContext.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGCIContext.java
new file mode 100644
index 0000000..3d722f8
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGCIContext.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.ipc.AlignmentContext;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
+
+import java.util.concurrent.atomic.LongAccumulator;
+
+/**
+ * This is the client side implementation responsible for receiving
+ * state alignment info from server(s).
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+class ClientGCIContext implements AlignmentContext {
+
+  private final DFSClient dfsClient;
+  private final LongAccumulator lastSeenStateId =
+      new LongAccumulator(Math::max, Long.MIN_VALUE);
+
+  /**
+   * Client side constructor.
+   * @param dfsClient client side state receiver
+   */
+  ClientGCIContext(DFSClient dfsClient) {
+    this.dfsClient = dfsClient;
+  }
+
+  /**
+   * Client side implementation only receives state alignment info.
+   * It does not provide state alignment info therefore this does nothing.
+   */
+  @Override
+  public void updateResponseState(RpcResponseHeaderProto.Builder header) {
+    // Do nothing.
+  }
+
+  /**
+   * Client side implementation for receiving state alignment info.
+   */
+  @Override
+  public void receiveResponseState(RpcResponseHeaderProto header) {
+    lastSeenStateId.accumulate(header.getStateId());
+    dfsClient.lastSeenStateId = lastSeenStateId.get();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9d73437/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 2badbb1..ecb21c2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -166,6 +166,7 @@ import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.retry.LossyRetryInvocationHandler;
+import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.RetriableException;
@@ -219,6 +220,7 @@ public class DFSClient implements java.io.Closeable, 
RemotePeerFactory,
   final UserGroupInformation ugi;
   volatile boolean clientRunning = true;
   volatile long lastLeaseRenewal;
+  volatile long lastSeenStateId;
   private volatile FsServerDefaults serverDefaults;
   private volatile long serverDefaultsLastUpdate;
   final String clientName;
@@ -396,6 +398,7 @@ public class DFSClient implements java.io.Closeable, 
RemotePeerFactory,
     this.saslClient = new SaslDataTransferClient(
         conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf),
         TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth);
+    Client.setAlignmentContext(new ClientGCIContext(this));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9d73437/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java
new file mode 100644
index 0000000..2d7d94e
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.ipc.AlignmentContext;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
+
+/**
+ * This is the server side implementation responsible for passing
+ * state alignment info to clients.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+class GlobalStateIdContext implements AlignmentContext {
+  private final FSNamesystem namesystem;
+
+  /**
+   * Server side constructor.
+   * @param namesystem server side state provider
+   */
+  GlobalStateIdContext(FSNamesystem namesystem) {
+    this.namesystem = namesystem;
+  }
+
+  /**
+   * Server side implementation for providing state alignment info.
+   */
+  @Override
+  public void updateResponseState(RpcResponseHeaderProto.Builder header) {
+    header.setStateId(namesystem.getLastWrittenTransactionId());
+  }
+
+  /**
+   * Server side implementation only provides state alignment info.
+   * It does not receive state alignment info therefore this does nothing.
+   */
+  @Override
+  public void receiveResponseState(RpcResponseHeaderProto header) {
+    // Do nothing.
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9d73437/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index f967df4..6ab0c5c 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -456,6 +456,7 @@ public class NameNodeRpcServer implements NamenodeProtocols 
{
         .setNumHandlers(handlerCount)
         .setVerbose(false)
         .setSecretManager(namesystem.getDelegationTokenSecretManager())
+        .setAlignmentContext(new GlobalStateIdContext(namesystem))
         .build();
 
     // Add all the RPC protocols that the namenode implements

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c9d73437/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContext.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContext.java
new file mode 100644
index 0000000..590f702
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContext.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+
+/**
+ * Class is used to test server sending state alignment information to clients
+ * via RPC and likewise clients receiving and updating their last known
+ * state alignment info.
+ * These tests check that after a single RPC call a client will have caught up
+ * to the most recent alignment state of the server.
+ */
+public class TestStateAlignmentContext {
+
+  static final long BLOCK_SIZE = 64 * 1024;
+  private static final int NUMDATANODES = 3;
+  private static final Configuration CONF = new HdfsConfiguration();
+
+  private static MiniDFSCluster cluster;
+  private static DistributedFileSystem dfs;
+
+  @BeforeClass
+  public static void startUpCluster() throws IOException {
+    // disable block scanner
+    CONF.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
+    // Set short retry timeouts so this test runs faster
+    CONF.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10);
+    CONF.setBoolean("fs.hdfs.impl.disable.cache", true);
+    cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(NUMDATANODES)
+        .build();
+    cluster.waitActive();
+  }
+
+  @Before
+  public void before() throws IOException {
+    dfs = cluster.getFileSystem();
+  }
+
+  @AfterClass
+  public static void shutDownCluster() throws IOException {
+    if (dfs != null) {
+      dfs.close();
+      dfs = null;
+    }
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  @After
+  public void after() throws IOException {
+    dfs.close();
+  }
+
+  /**
+   * This test checks if after a client writes we can see the state id in
+   * updated via the response.
+   */
+  @Test
+  public void testStateTransferOnWrite() throws Exception {
+    long preWriteState = cluster.getNamesystem().getLastWrittenTransactionId();
+    DFSTestUtil.writeFile(dfs, new Path("/testFile1"), "abc");
+    long clientState = dfs.dfs.lastSeenStateId;
+    long postWriteState = 
cluster.getNamesystem().getLastWrittenTransactionId();
+    // Write(s) should have increased state. Check for greater than.
+    assertThat(clientState > preWriteState, is(true));
+    // Client and server state should be equal.
+    assertThat(clientState, is(postWriteState));
+  }
+
+  /**
+   * This test checks if after a client reads we can see the state id in
+   * updated via the response.
+   */
+  @Test
+  public void testStateTransferOnRead() throws Exception {
+    DFSTestUtil.writeFile(dfs, new Path("/testFile2"), "123");
+    long lastWrittenId = cluster.getNamesystem().getLastWrittenTransactionId();
+    DFSTestUtil.readFile(dfs, new Path("/testFile2"));
+    // Read should catch client up to last written state.
+    assertThat(dfs.dfs.lastSeenStateId, is(lastWrittenId));
+  }
+
+  /**
+   * This test checks that a fresh client starts with no state and becomes
+   * updated of state from RPC call.
+   */
+  @Test
+  public void testStateTransferOnFreshClient() throws Exception {
+    DFSTestUtil.writeFile(dfs, new Path("/testFile3"), "ezpz");
+    long lastWrittenId = cluster.getNamesystem().getLastWrittenTransactionId();
+    try (DistributedFileSystem clearDfs =
+             (DistributedFileSystem) FileSystem.get(CONF)) {
+      assertThat(clearDfs.dfs.lastSeenStateId, is(0L));
+      DFSTestUtil.readFile(clearDfs, new Path("/testFile3"));
+      assertThat(clearDfs.dfs.lastSeenStateId, is(lastWrittenId));
+    }
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to