Repository: hadoop
Updated Branches:
  refs/heads/trunk 27ffec7ba -> 5fb14e063


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5fb14e06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestCsiAdaptorService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestCsiAdaptorService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestCsiAdaptorService.java
new file mode 100644
index 0000000..128240d
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestCsiAdaptorService.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.csi.adaptor;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import csi.v0.Csi;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.ServiceStateException;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
+import 
org.apache.hadoop.yarn.api.impl.pb.client.CsiAdaptorProtocolPBClientImpl;
+import 
org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
+import 
org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
+import 
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesRequestPBImpl;
+import org.apache.hadoop.yarn.client.NMProxy;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.csi.client.CsiClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+
+import static 
org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest.AccessMode.MULTI_NODE_MULTI_WRITER;
+import static 
org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest.VolumeType.FILE_SYSTEM;
+
+/**
+ * UT for {@link CsiAdaptorProtocolService}.
+ */
+public class TestCsiAdaptorService {
+
+  private static File testRoot = null;
+  private static String domainSocket = null;
+
+  @BeforeClass
+  public static void setUp() throws IOException {
+    testRoot = GenericTestUtils.getTestDir("csi-test");
+    File socketPath = new File(testRoot, "csi.sock");
+    FileUtils.forceMkdirParent(socketPath);
+    domainSocket = "unix://" + socketPath.getAbsolutePath();
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException {
+    if (testRoot != null) {
+      FileUtils.deleteDirectory(testRoot);
+    }
+  }
+
+  @Test
+  public void testValidateVolume() throws IOException, YarnException {
+    ServerSocket ss = new ServerSocket(0);
+    ss.close();
+    InetSocketAddress address = new InetSocketAddress(ss.getLocalPort());
+    Configuration conf = new Configuration();
+    conf.setSocketAddr(
+        YarnConfiguration.NM_CSI_ADAPTOR_PREFIX + "test-driver.address",
+        address);
+    CsiAdaptorProtocolService service =
+        new CsiAdaptorProtocolService("test-driver", domainSocket);
+
+    // inject a fake CSI client
+    // this client validates if the ValidateVolumeCapabilitiesRequest
+    // is integrity, and then reply a fake response
+    service.setCsiClient(new CsiClient() {
+      @Override
+      public Csi.GetPluginInfoResponse getPluginInfo() {
+        return Csi.GetPluginInfoResponse.newBuilder()
+            .setName("test-plugin")
+            .setVendorVersion("0.1")
+            .build();
+      }
+
+      @Override
+      public Csi.ValidateVolumeCapabilitiesResponse validateVolumeCapabilities(
+          Csi.ValidateVolumeCapabilitiesRequest request) {
+        // validate we get all info from the request
+        Assert.assertEquals("volume-id-0000123", request.getVolumeId());
+        Assert.assertEquals(1, request.getVolumeCapabilitiesCount());
+        Assert.assertEquals(Csi.VolumeCapability.AccessMode
+            .newBuilder().setModeValue(5).build(),
+            request.getVolumeCapabilities(0).getAccessMode());
+        Assert.assertTrue(request.getVolumeCapabilities(0).hasMount());
+        Assert.assertEquals(2, request.getVolumeCapabilities(0)
+            .getMount().getMountFlagsCount());
+        Assert.assertTrue(request.getVolumeCapabilities(0)
+            .getMount().getMountFlagsList().contains("mountFlag1"));
+        Assert.assertTrue(request.getVolumeCapabilities(0)
+            .getMount().getMountFlagsList().contains("mountFlag2"));
+        Assert.assertEquals(2, request.getVolumeAttributesCount());
+        Assert.assertEquals("v1", request.getVolumeAttributesMap().get("k1"));
+        Assert.assertEquals("v2", request.getVolumeAttributesMap().get("k2"));
+        // return a fake result
+        return Csi.ValidateVolumeCapabilitiesResponse.newBuilder()
+            .setSupported(false)
+            .setMessage("this is a test")
+            .build();
+      }
+    });
+
+    service.init(conf);
+    service.start();
+
+    try (CsiAdaptorProtocolPBClientImpl client =
+        new CsiAdaptorProtocolPBClientImpl(1L, address, new Configuration())) {
+      ValidateVolumeCapabilitiesRequest request =
+          ValidateVolumeCapabilitiesRequestPBImpl
+              .newInstance("volume-id-0000123",
+                  ImmutableList.of(
+                      new ValidateVolumeCapabilitiesRequest
+                          .VolumeCapability(
+                              MULTI_NODE_MULTI_WRITER, FILE_SYSTEM,
+                          ImmutableList.of("mountFlag1", "mountFlag2"))),
+              ImmutableMap.of("k1", "v1", "k2", "v2"));
+
+      ValidateVolumeCapabilitiesResponse response = client
+          .validateVolumeCapacity(request);
+
+      Assert.assertEquals(false, response.isSupported());
+      Assert.assertEquals("this is a test", response.getResponseMessage());
+    } finally {
+      service.stop();
+    }
+  }
+
+  @Test
+  public void testValidateVolumeWithNMProxy() throws Exception {
+    ServerSocket ss = new ServerSocket(0);
+    ss.close();
+    InetSocketAddress address = new InetSocketAddress(ss.getLocalPort());
+    Configuration conf = new Configuration();
+    conf.setSocketAddr(
+        YarnConfiguration.NM_CSI_ADAPTOR_PREFIX + "test-driver.address",
+        address);
+    CsiAdaptorProtocolService service =
+        new CsiAdaptorProtocolService("test-driver", domainSocket);
+
+    // inject a fake CSI client
+    // this client validates if the ValidateVolumeCapabilitiesRequest
+    // is integrity, and then reply a fake response
+    service.setCsiClient(new CsiClient() {
+      @Override
+      public Csi.GetPluginInfoResponse getPluginInfo() {
+        return Csi.GetPluginInfoResponse.newBuilder()
+            .setName("test-plugin")
+            .setVendorVersion("0.1")
+            .build();
+      }
+
+      @Override
+      public Csi.ValidateVolumeCapabilitiesResponse validateVolumeCapabilities(
+          Csi.ValidateVolumeCapabilitiesRequest request) {
+        // validate we get all info from the request
+        Assert.assertEquals("volume-id-0000123", request.getVolumeId());
+        Assert.assertEquals(1, request.getVolumeCapabilitiesCount());
+        Assert.assertEquals(Csi.VolumeCapability.AccessMode
+                .newBuilder().setModeValue(5).build(),
+            request.getVolumeCapabilities(0).getAccessMode());
+        Assert.assertTrue(request.getVolumeCapabilities(0).hasMount());
+        Assert.assertEquals(2, request.getVolumeCapabilities(0)
+            .getMount().getMountFlagsCount());
+        Assert.assertTrue(request.getVolumeCapabilities(0)
+            .getMount().getMountFlagsList().contains("mountFlag1"));
+        Assert.assertTrue(request.getVolumeCapabilities(0)
+            .getMount().getMountFlagsList().contains("mountFlag2"));
+        Assert.assertEquals(2, request.getVolumeAttributesCount());
+        Assert.assertEquals("v1", request.getVolumeAttributesMap().get("k1"));
+        Assert.assertEquals("v2", request.getVolumeAttributesMap().get("k2"));
+        // return a fake result
+        return Csi.ValidateVolumeCapabilitiesResponse.newBuilder()
+            .setSupported(false)
+            .setMessage("this is a test")
+            .build();
+      }
+    });
+
+    service.init(conf);
+    service.start();
+
+    YarnRPC rpc = YarnRPC.create(conf);
+    UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+    CsiAdaptorProtocol adaptorClient = NMProxy
+        .createNMProxy(conf, CsiAdaptorProtocol.class, currentUser, rpc,
+            NetUtils.createSocketAddrForHost("localhost", ss.getLocalPort()));
+    ValidateVolumeCapabilitiesRequest request =
+        ValidateVolumeCapabilitiesRequestPBImpl
+            .newInstance("volume-id-0000123",
+                ImmutableList.of(new ValidateVolumeCapabilitiesRequest
+                    .VolumeCapability(
+                        MULTI_NODE_MULTI_WRITER, FILE_SYSTEM,
+                    ImmutableList.of("mountFlag1", "mountFlag2"))),
+                ImmutableMap.of("k1", "v1", "k2", "v2"));
+
+    ValidateVolumeCapabilitiesResponse response = adaptorClient
+        .validateVolumeCapacity(request);
+    Assert.assertEquals(false, response.isSupported());
+    Assert.assertEquals("this is a test", response.getResponseMessage());
+
+    service.stop();
+  }
+
+  @Test (expected = ServiceStateException.class)
+  public void testMissingConfiguration() {
+    Configuration conf = new Configuration();
+    CsiAdaptorProtocolService service =
+        new CsiAdaptorProtocolService("test-driver", domainSocket);
+    service.init(conf);
+  }
+
+  @Test (expected = ServiceStateException.class)
+  public void testInvalidServicePort() {
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.NM_CSI_ADAPTOR_PREFIX
+        + "test-driver-0001.address",
+        "0.0.0.0:-100"); // this is an invalid address
+    CsiAdaptorProtocolService service =
+        new CsiAdaptorProtocolService("test-driver-0001", domainSocket);
+    service.init(conf);
+  }
+
+  @Test (expected = ServiceStateException.class)
+  public void testInvalidHost() {
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.NM_CSI_ADAPTOR_PREFIX
+            + "test-driver-0001.address",
+        "192.0.1:8999"); // this is an invalid ip address
+    CsiAdaptorProtocolService service =
+        new CsiAdaptorProtocolService("test-driver-0001", domainSocket);
+    service.init(conf);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5fb14e06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestGetPluginInfoRequestResponse.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestGetPluginInfoRequestResponse.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestGetPluginInfoRequestResponse.java
new file mode 100644
index 0000000..f1734c8
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestGetPluginInfoRequestResponse.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.csi.adaptor;
+
+import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse;
+import 
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetPluginInfoRequestPBImpl;
+import 
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetPluginInfoResponsePBImpl;
+import org.apache.hadoop.yarn.proto.CsiAdaptorProtos;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Verify the integrity of GetPluginInfoRequest and GetPluginInfoResponse.
+ */
+public class TestGetPluginInfoRequestResponse {
+
+  @Test
+  public void testGetPluginInfoRequestPBRecord() {
+    CsiAdaptorProtos.GetPluginInfoRequest requestProto =
+        CsiAdaptorProtos.GetPluginInfoRequest.newBuilder().build();
+    GetPluginInfoRequestPBImpl pbImpl =
+        new GetPluginInfoRequestPBImpl(requestProto);
+    Assert.assertNotNull(pbImpl);
+    Assert.assertEquals(requestProto, pbImpl.getProto());
+  }
+
+  @Test
+  public void testGetPluginInfoResponsePBRecord() {
+    CsiAdaptorProtos.GetPluginInfoResponse responseProto =
+        CsiAdaptorProtos.GetPluginInfoResponse.newBuilder()
+        .setName("test-driver")
+        .setVendorVersion("1.0.1")
+        .build();
+
+    GetPluginInfoResponsePBImpl pbImpl =
+        new GetPluginInfoResponsePBImpl(responseProto);
+    Assert.assertEquals("test-driver", pbImpl.getDriverName());
+    Assert.assertEquals("1.0.1", pbImpl.getVersion());
+    Assert.assertEquals(responseProto, pbImpl.getProto());
+
+    GetPluginInfoResponse pbImpl2 = GetPluginInfoResponsePBImpl
+        .newInstance("test-driver", "1.0.1");
+    Assert.assertEquals("test-driver", pbImpl2.getDriverName());
+    Assert.assertEquals("1.0.1", pbImpl2.getVersion());
+
+    CsiAdaptorProtos.GetPluginInfoResponse proto =
+        ((GetPluginInfoResponsePBImpl) pbImpl2).getProto();
+    Assert.assertEquals("test-driver", proto.getName());
+    Assert.assertEquals("1.0.1", proto.getVendorVersion());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5fb14e06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestValidateVolumeCapabilityRequest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestValidateVolumeCapabilityRequest.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestValidateVolumeCapabilityRequest.java
new file mode 100644
index 0000000..303cfc4
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestValidateVolumeCapabilityRequest.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.csi.adaptor;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import 
org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
+import 
org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest.VolumeCapability;
+import 
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesRequestPBImpl;
+import org.apache.hadoop.yarn.proto.CsiAdaptorProtos;
+import 
org.apache.hadoop.yarn.proto.CsiAdaptorProtos.VolumeCapability.AccessMode;
+import 
org.apache.hadoop.yarn.proto.CsiAdaptorProtos.VolumeCapability.VolumeType;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static 
org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest.AccessMode.MULTI_NODE_MULTI_WRITER;
+import static 
org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest.VolumeType.FILE_SYSTEM;
+
+/**
+ * UT for message exchanges.
+ */
+public class TestValidateVolumeCapabilityRequest {
+
+  @Test
+  public void testPBRecord() {
+    CsiAdaptorProtos.VolumeCapability vcProto =
+        CsiAdaptorProtos.VolumeCapability.newBuilder()
+            .setAccessMode(AccessMode.MULTI_NODE_MULTI_WRITER)
+            .setVolumeType(VolumeType.FILE_SYSTEM)
+            .addMountFlags("flag0")
+            .addMountFlags("flag1")
+            .build();
+
+    CsiAdaptorProtos.ValidateVolumeCapabilitiesRequest requestProto =
+        CsiAdaptorProtos.ValidateVolumeCapabilitiesRequest.newBuilder()
+            .setVolumeId("volume-id-0000001")
+            .addVolumeCapabilities(vcProto)
+            .addVolumeAttributes(YarnProtos.StringStringMapProto
+                .newBuilder().setKey("attr0")
+                .setValue("value0")
+                .build())
+            .addVolumeAttributes(YarnProtos.StringStringMapProto
+                .newBuilder().setKey("attr1")
+                .setValue("value1")
+                .build())
+            .build();
+
+    ValidateVolumeCapabilitiesRequestPBImpl request =
+        new ValidateVolumeCapabilitiesRequestPBImpl(requestProto);
+
+    Assert.assertEquals("volume-id-0000001", request.getVolumeId());
+    Assert.assertEquals(2, request.getVolumeAttributes().size());
+    Assert.assertEquals("value0", request.getVolumeAttributes().get("attr0"));
+    Assert.assertEquals("value1", request.getVolumeAttributes().get("attr1"));
+    Assert.assertEquals(1, request.getVolumeCapabilities().size());
+    VolumeCapability vc =
+        request.getVolumeCapabilities().get(0);
+    Assert.assertEquals(MULTI_NODE_MULTI_WRITER, vc.getAccessMode());
+    Assert.assertEquals(FILE_SYSTEM, vc.getVolumeType());
+    Assert.assertEquals(2, vc.getMountFlags().size());
+
+    Assert.assertEquals(requestProto, request.getProto());
+  }
+
+  @Test
+  public void testNewInstance() {
+    ValidateVolumeCapabilitiesRequest pbImpl =
+        ValidateVolumeCapabilitiesRequestPBImpl
+            .newInstance("volume-id-0000123",
+                ImmutableList.of(
+                    new VolumeCapability(
+                        MULTI_NODE_MULTI_WRITER, FILE_SYSTEM,
+                        ImmutableList.of("mountFlag1", "mountFlag2"))),
+                ImmutableMap.of("k1", "v1", "k2", "v2"));
+
+    Assert.assertEquals("volume-id-0000123", pbImpl.getVolumeId());
+    Assert.assertEquals(1, pbImpl.getVolumeCapabilities().size());
+    Assert.assertEquals(FILE_SYSTEM,
+        pbImpl.getVolumeCapabilities().get(0).getVolumeType());
+    Assert.assertEquals(MULTI_NODE_MULTI_WRITER,
+        pbImpl.getVolumeCapabilities().get(0).getAccessMode());
+    Assert.assertEquals(2, pbImpl.getVolumeAttributes().size());
+
+    CsiAdaptorProtos.ValidateVolumeCapabilitiesRequest proto =
+        ((ValidateVolumeCapabilitiesRequestPBImpl) pbImpl).getProto();
+    Assert.assertEquals("volume-id-0000123", proto.getVolumeId());
+    Assert.assertEquals(1, proto.getVolumeCapabilitiesCount());
+    Assert.assertEquals(AccessMode.MULTI_NODE_MULTI_WRITER,
+        proto.getVolumeCapabilities(0).getAccessMode());
+    Assert.assertEquals(VolumeType.FILE_SYSTEM,
+        proto.getVolumeCapabilities(0).getVolumeType());
+    Assert.assertEquals(2, proto.getVolumeCapabilities(0)
+        .getMountFlagsCount());
+    Assert.assertEquals(2, proto.getVolumeCapabilities(0)
+        .getMountFlagsList().size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5fb14e06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestValidateVolumeCapabilityResponse.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestValidateVolumeCapabilityResponse.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestValidateVolumeCapabilityResponse.java
new file mode 100644
index 0000000..97f116a
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestValidateVolumeCapabilityResponse.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.csi.adaptor;
+
+import 
org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
+import 
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesResponsePBImpl;
+import org.apache.hadoop.yarn.proto.CsiAdaptorProtos;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * UT for message exchanges.
+ */
+public class TestValidateVolumeCapabilityResponse {
+
+  @Test
+  public void testPBRecord() {
+    CsiAdaptorProtos.ValidateVolumeCapabilitiesResponse proto =
+        CsiAdaptorProtos.ValidateVolumeCapabilitiesResponse.newBuilder()
+        .setSupported(true)
+        .setMessage("capability is supported")
+        .build();
+
+    ValidateVolumeCapabilitiesResponsePBImpl pbImpl =
+        new ValidateVolumeCapabilitiesResponsePBImpl(proto);
+
+    Assert.assertEquals(true, pbImpl.isSupported());
+    Assert.assertEquals("capability is supported", 
pbImpl.getResponseMessage());
+    Assert.assertEquals(proto, pbImpl.getProto());
+  }
+
+  @Test
+  public void testNewInstance() {
+    ValidateVolumeCapabilitiesResponse pbImpl =
+        ValidateVolumeCapabilitiesResponsePBImpl
+            .newInstance(false, "capability not supported");
+    Assert.assertEquals(false, pbImpl.isSupported());
+    Assert.assertEquals("capability not supported",
+        pbImpl.getResponseMessage());
+
+    CsiAdaptorProtos.ValidateVolumeCapabilitiesResponse proto =
+        ((ValidateVolumeCapabilitiesResponsePBImpl) pbImpl).getProto();
+    Assert.assertEquals(false, proto.getSupported());
+    Assert.assertEquals("capability not supported", proto.getMessage());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5fb14e06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/package-info.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/package-info.java
new file mode 100644
index 0000000..ecc7fc2
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains UT classes for CSI adaptor.
+ */
+package org.apache.hadoop.yarn.csi.adaptor;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5fb14e06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/CsiAdaptorClientProtocol.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/CsiAdaptorClientProtocol.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/CsiAdaptorClientProtocol.java
deleted file mode 100644
index b894d4e..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/CsiAdaptorClientProtocol.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.volume.csi;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.yarn.server.volume.csi.exception.VolumeException;
-
-/**
- * Protocol for the CSI adaptor.
- */
-@Private
-@Unstable
-public interface CsiAdaptorClientProtocol {
-
-  void validateVolume() throws VolumeException;
-
-  void controllerPublishVolume() throws VolumeException;
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5fb14e06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/CsiAdaptorClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/CsiAdaptorClient.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/CsiAdaptorClient.java
deleted file mode 100644
index 043e7ae..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/CsiAdaptorClient.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.resourcemanager.volume.csi;
-
-import org.apache.hadoop.yarn.server.volume.csi.exception.VolumeException;
-import org.apache.hadoop.yarn.server.volume.csi.CsiAdaptorClientProtocol;
-
-/**
- * Client talks to CSI adaptor.
- */
-public class CsiAdaptorClient implements CsiAdaptorClientProtocol {
-
-  @Override
-  public void validateVolume() throws VolumeException {
-    // TODO
-  }
-
-  @Override public void controllerPublishVolume() throws VolumeException {
-    // TODO
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5fb14e06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManager.java
index 5f2669d..32563cb 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManager.java
@@ -17,13 +17,12 @@
  */
 package org.apache.hadoop.yarn.server.resourcemanager.volume.csi;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
 import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume;
 import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner.VolumeProvisioningResults;
 import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner.VolumeProvisioningTask;
-import org.apache.hadoop.yarn.server.volume.csi.CsiAdaptorClientProtocol;
 
 import java.util.concurrent.ScheduledFuture;
 
@@ -40,12 +39,8 @@ public interface VolumeManager {
   /**
    * @return all known volumes and their states.
    */
-  @VisibleForTesting
   VolumeStates getVolumeStates();
 
-  @VisibleForTesting
-  void setClient(CsiAdaptorClientProtocol client);
-
   /**
    * Start to supervise on a volume.
    * @param volume
@@ -60,4 +55,20 @@ public interface VolumeManager {
    */
   ScheduledFuture<VolumeProvisioningResults> schedule(
       VolumeProvisioningTask volumeProvisioningTask, int delaySecond);
+
+  /**
+   * Register a csi-driver-adaptor to the volume manager.
+   * @param driverName
+   * @param client
+   */
+  void registerCsiDriverAdaptor(String driverName, CsiAdaptorProtocol client);
+
+  /**
+   * Returns the csi-driver-adaptor client from cache by the given driver name.
+   * If the client is not found, null is returned.
+   * @param driverName
+   * @return a csi-driver-adaptor client working for given driver or null
+   * if the adaptor could not be found.
+   */
+  CsiAdaptorProtocol getAdaptorByDriverName(String driverName);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5fb14e06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManagerImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManagerImpl.java
index 5252f53..839d1bc 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManagerImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManagerImpl.java
@@ -18,16 +18,28 @@
 package org.apache.hadoop.yarn.server.resourcemanager.volume.csi;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse;
+import org.apache.hadoop.yarn.client.NMProxy;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
 import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume;
-import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeImpl;
 import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner.VolumeProvisioningResults;
 import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner.VolumeProvisioningTask;
-import org.apache.hadoop.yarn.server.volume.csi.CsiAdaptorClientProtocol;
 
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -43,20 +55,84 @@ public class VolumeManagerImpl extends AbstractService
 
   private final VolumeStates volumeStates;
   private ScheduledExecutorService provisioningExecutor;
-  private CsiAdaptorClientProtocol adaptorClient;
+  private Map<String, CsiAdaptorProtocol> csiAdaptorMap;
 
   private final static int PROVISIONING_TASK_THREAD_POOL_SIZE = 10;
 
   public VolumeManagerImpl() {
     super(VolumeManagerImpl.class.getName());
     this.volumeStates = new VolumeStates();
+    this.csiAdaptorMap = new ConcurrentHashMap<>();
     this.provisioningExecutor = Executors
         .newScheduledThreadPool(PROVISIONING_TASK_THREAD_POOL_SIZE);
-    this.adaptorClient = new CsiAdaptorClient();
+  }
+
+  // Init the CSI adaptor cache according to the configuration.
+  // user only needs to configure a list of adaptor addresses,
+  // this method extracts each address and init an adaptor client,
+  // then proceed with a hand-shake by calling adaptor's getPluginInfo
+  // method to retrieve the driver info. If the driver can be resolved,
+  // it is then added to the cache. Note, we don't allow two drivers
+  // specified with same driver-name even version is different.
+  private void initCsiAdaptorCache(
+      final Map<String, CsiAdaptorProtocol> adaptorMap, Configuration conf)
+      throws IOException, YarnException {
+    LOG.info("Initializing cache for csi-driver-adaptors");
+    String[] addresses =
+        conf.getStrings(YarnConfiguration.NM_CSI_ADAPTOR_ADDRESSES);
+    if (addresses != null && addresses.length > 0) {
+      for (String addr : addresses) {
+        LOG.info("Found csi-driver-adaptor socket address: " + addr);
+        InetSocketAddress address = NetUtils.createSocketAddr(addr);
+        YarnRPC rpc = YarnRPC.create(conf);
+        UserGroupInformation currentUser =
+            UserGroupInformation.getCurrentUser();
+        CsiAdaptorProtocol adaptorClient = NMProxy
+            .createNMProxy(conf, CsiAdaptorProtocol.class, currentUser, rpc,
+                address);
+        // Attempt to resolve the driver by contacting to
+        // the diver's identity service on the given address.
+        // If the call failed, the initialization is also failed
+        // in order running into inconsistent state.
+        LOG.info("Retrieving info from csi-driver-adaptor on address " + addr);
+        GetPluginInfoResponse response =
+            adaptorClient.getPluginInfo(GetPluginInfoRequest.newInstance());
+        if (!Strings.isNullOrEmpty(response.getDriverName())) {
+          String driverName = response.getDriverName();
+          if (adaptorMap.containsKey(driverName)) {
+            throw new YarnException(
+                "Duplicate driver adaptor found," + " driver name: "
+                    + driverName);
+          }
+          adaptorMap.put(driverName, adaptorClient);
+          LOG.info("CSI Adaptor added to the cache, adaptor name: " + 
driverName
+              + ", driver version: " + response.getVersion());
+        }
+      }
+    }
+  }
+
+  /**
+   * Returns a CsiAdaptorProtocol client by the given driver name,
+   * returns null if no adaptor is found for the driver, that means
+   * the driver has not registered to the volume manager yet enhance not valid.
+   * @param driverName the name of the driver
+   * @return CsiAdaptorProtocol client or null if driver not registered
+   */
+  public CsiAdaptorProtocol getAdaptorByDriverName(String driverName) {
+    return csiAdaptorMap.get(driverName);
+  }
+
+  @VisibleForTesting
+  @Override
+  public void registerCsiDriverAdaptor(String driverName,
+      CsiAdaptorProtocol client) {
+    this.csiAdaptorMap.put(driverName, client);
   }
 
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
+    initCsiAdaptorCache(csiAdaptorMap, conf);
     super.serviceInit(conf);
   }
 
@@ -82,18 +158,11 @@ public class VolumeManagerImpl extends AbstractService
       // volume already exists
       return volumeStates.getVolume(volume.getVolumeId());
     } else {
-      // add the volume and set the client
-      ((VolumeImpl) volume).setClient(adaptorClient);
       this.volumeStates.addVolumeIfAbsent(volume);
       return volume;
     }
   }
 
-  @VisibleForTesting
-  public void setClient(CsiAdaptorClientProtocol client) {
-    this.adaptorClient = client;
-  }
-
   @Override
   public ScheduledFuture<VolumeProvisioningResults> schedule(
       VolumeProvisioningTask volumeProvisioningTask,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5fb14e06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/Volume.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/Volume.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/Volume.java
index 68e89b0..83501ac 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/Volume.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/Volume.java
@@ -19,9 +19,11 @@ package 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
 import org.apache.hadoop.yarn.event.EventHandler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event.VolumeEvent;
 import org.apache.hadoop.yarn.server.volume.csi.VolumeId;
+import org.apache.hadoop.yarn.server.volume.csi.VolumeMetaData;
 
 /**
  * Major volume interface at RM's view, it maintains the volume states and
@@ -34,4 +36,10 @@ public interface Volume extends EventHandler<VolumeEvent> {
   VolumeState getVolumeState();
 
   VolumeId getVolumeId();
+
+  VolumeMetaData getVolumeMeta();
+
+  CsiAdaptorProtocol getClient();
+
+  void setClient(CsiAdaptorProtocol client);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5fb14e06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeImpl.java
index 2515047..82a4acb 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeImpl.java
@@ -18,10 +18,15 @@
 package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.CsiAdaptorClient;
-import org.apache.hadoop.yarn.server.volume.csi.CsiAdaptorClientProtocol;
+import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
+import 
org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
+import 
org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest.VolumeCapability;
+import 
org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event.VolumeEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event.VolumeEventType;
 import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
@@ -30,13 +35,16 @@ import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.server.volume.csi.VolumeId;
 import org.apache.hadoop.yarn.server.volume.csi.VolumeMetaData;
-import org.apache.hadoop.yarn.server.volume.csi.exception.VolumeException;
 
+import java.io.IOException;
 import java.util.EnumSet;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import static 
org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest.AccessMode.SINGLE_NODE_READER_ONLY;
+import static 
org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest.VolumeType.FILE_SYSTEM;
+
 /**
  * This class maintains the volume states and state transition
  * according to the CSI volume lifecycle. Volume states are stored in
@@ -54,7 +62,7 @@ public class VolumeImpl implements Volume {
 
   private final VolumeId volumeId;
   private final VolumeMetaData volumeMeta;
-  private CsiAdaptorClientProtocol client;
+  private CsiAdaptorProtocol adaptorClient;
 
   public VolumeImpl(VolumeMetaData volumeMeta) {
     ReadWriteLock lock = new ReentrantReadWriteLock();
@@ -63,16 +71,21 @@ public class VolumeImpl implements Volume {
     this.volumeId = volumeMeta.getVolumeId();
     this.volumeMeta = volumeMeta;
     this.stateMachine = createVolumeStateFactory().make(this);
-    this.client = new CsiAdaptorClient();
   }
 
   @VisibleForTesting
-  public void setClient(CsiAdaptorClientProtocol client) {
-    this.client = client;
+  public void setClient(CsiAdaptorProtocol csiAdaptorClient) {
+    this.adaptorClient = csiAdaptorClient;
   }
 
-  public CsiAdaptorClientProtocol getClient() {
-    return this.client;
+  @Override
+  public CsiAdaptorProtocol getClient() {
+    return this.adaptorClient;
+  }
+
+  @Override
+  public VolumeMetaData getVolumeMeta() {
+    return this.volumeMeta;
   }
 
   private StateMachineFactory<VolumeImpl, VolumeState,
@@ -135,9 +148,20 @@ public class VolumeImpl implements Volume {
         VolumeEvent volumeEvent) {
       try {
         // this call could cross node, we should keep the message tight
-        volume.getClient().validateVolume();
-        return VolumeState.VALIDATED;
-      } catch (VolumeException e) {
+        // TODO we should parse the capability from volume resource spec
+        VolumeCapability capability = new VolumeCapability(
+            SINGLE_NODE_READER_ONLY, FILE_SYSTEM,
+            ImmutableList.of());
+        ValidateVolumeCapabilitiesRequest request =
+            ValidateVolumeCapabilitiesRequest
+                .newInstance(volume.getVolumeId().getId(),
+                    ImmutableList.of(capability),
+                    ImmutableMap.of());
+        ValidateVolumeCapabilitiesResponse response = volume.getClient()
+            .validateVolumeCapacity(request);
+        return response.isSupported() ? VolumeState.VALIDATED
+            : VolumeState.UNAVAILABLE;
+      } catch (YarnException | IOException e) {
         LOG.warn("Got exception while calling the CSI adaptor", e);
         return VolumeState.UNAVAILABLE;
       }
@@ -150,14 +174,8 @@ public class VolumeImpl implements Volume {
     @Override
     public VolumeState transition(VolumeImpl volume,
         VolumeEvent volumeEvent) {
-      try {
-        // this call could cross node, we should keep the message tight
-        volume.getClient().controllerPublishVolume();
-        return VolumeState.NODE_READY;
-      } catch (VolumeException e) {
-        LOG.warn("Got exception while calling the CSI adaptor", e);
-        return volume.getVolumeState();
-      }
+      // this call could cross node, we should keep the message tight
+      return VolumeState.NODE_READY;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5fb14e06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/processor/VolumeAMSProcessor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/processor/VolumeAMSProcessor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/processor/VolumeAMSProcessor.java
index f275768..814634a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/processor/VolumeAMSProcessor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/processor/VolumeAMSProcessor.java
@@ -19,6 +19,7 @@ package 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.processor;
 
 import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext;
 import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor;
+import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import 
org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
@@ -29,6 +30,7 @@ import 
org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.VolumeManager;
@@ -142,8 +144,21 @@ public class VolumeAMSProcessor implements 
ApplicationMasterServiceProcessor {
    * @param metaData
    * @return volume
    */
-  private Volume checkAndGetVolume(VolumeMetaData metaData) {
+  private Volume checkAndGetVolume(VolumeMetaData metaData)
+      throws InvalidVolumeException {
     Volume toAdd = new VolumeImpl(metaData);
+    CsiAdaptorProtocol adaptor = volumeManager
+        .getAdaptorByDriverName(metaData.getDriverName());
+    if (adaptor == null) {
+      throw new InvalidVolumeException("It seems for the driver name"
+          + " specified in the volume " + metaData.getDriverName()
+          + " ,there is no matched driver-adaptor can be found. "
+          + "Is the driver probably registered? Please check if"
+          + " adaptors service addresses defined in "
+          + YarnConfiguration.NM_CSI_ADAPTOR_ADDRESSES
+          + " are correct and services are started.");
+    }
+    toAdd.setClient(adaptor);
     return this.volumeManager.addOrGetVolume(toAdd);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5fb14e06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeLifecycle.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeLifecycle.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeLifecycle.java
index 18c23e8..a94f508 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeLifecycle.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeLifecycle.java
@@ -18,9 +18,11 @@
 package org.apache.hadoop.yarn.server.resourcemanager.volume.csi;
 
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
+import 
org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
+import 
org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event.ControllerPublishVolumeEvent;
-import org.apache.hadoop.yarn.server.volume.csi.CsiAdaptorClientProtocol;
-import 
org.apache.hadoop.yarn.server.volume.csi.exception.InvalidVolumeException;
 import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event.ValidateVolumeEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeImpl;
 import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeState;
@@ -29,8 +31,8 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import java.io.IOException;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.mockito.Mockito.*;
 
@@ -40,7 +42,13 @@ import static org.mockito.Mockito.*;
 public class TestVolumeLifecycle {
 
   @Test
-  public void testValidation() throws InvalidVolumeException {
+  public void testValidation() throws YarnException, IOException {
+    CsiAdaptorProtocol mockedClient = Mockito
+        .mock(CsiAdaptorProtocol.class);
+    doReturn(ValidateVolumeCapabilitiesResponse.newInstance(true, ""))
+        .when(mockedClient)
+       .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class));
+
     VolumeImpl volume = (VolumeImpl) VolumeBuilder.newBuilder()
         .volumeId("test_vol_00000001")
         .maxCapability(5L)
@@ -48,6 +56,7 @@ public class TestVolumeLifecycle {
         .mountPoint("/path/to/mount")
         .driverName("test-driver-name")
         .build();
+    volume.setClient(mockedClient);
     Assert.assertEquals(VolumeState.NEW, volume.getVolumeState());
 
     volume.handle(new ValidateVolumeEvent(volume));
@@ -55,16 +64,19 @@ public class TestVolumeLifecycle {
   }
 
   @Test
-  public void testValidationFailure() throws VolumeException {
+  public void testVolumeCapacityNotSupported() throws Exception {
+    CsiAdaptorProtocol mockedClient = Mockito
+        .mock(CsiAdaptorProtocol.class);
+
     VolumeImpl volume = (VolumeImpl) VolumeBuilder
         .newBuilder().volumeId("test_vol_00000001").build();
-    CsiAdaptorClientProtocol mockedClient = Mockito
-        .mock(CsiAdaptorClientProtocol.class);
     volume.setClient(mockedClient);
 
     // NEW -> UNAVAILABLE
     // Simulate a failed API call to the adaptor
-    doThrow(new VolumeException("failed")).when(mockedClient).validateVolume();
+    doReturn(ValidateVolumeCapabilitiesResponse.newInstance(false, ""))
+        .when(mockedClient)
+        .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class));
     volume.handle(new ValidateVolumeEvent(volume));
 
     try {
@@ -80,47 +92,62 @@ public class TestVolumeLifecycle {
   }
 
   @Test
-  public void testValidated() throws InvalidVolumeException {
-    AtomicInteger validatedTimes = new AtomicInteger(0);
+  public void testValidationFailure() throws YarnException, IOException {
+    CsiAdaptorProtocol mockedClient = Mockito
+        .mock(CsiAdaptorProtocol.class);
+    doThrow(new VolumeException("fail"))
+        .when(mockedClient)
+        .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class));
+
+    VolumeImpl volume = (VolumeImpl) VolumeBuilder
+        .newBuilder().volumeId("test_vol_00000001").build();
+    volume.setClient(mockedClient);
+
+    // NEW -> UNAVAILABLE
+    // Simulate a failed API call to the adaptor
+    doThrow(new VolumeException("failed"))
+        .when(mockedClient)
+        .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class));
+    volume.handle(new ValidateVolumeEvent(volume));
+  }
+
+  @Test
+  public void testValidated() throws YarnException, IOException {
     VolumeImpl volume = (VolumeImpl) VolumeBuilder
         .newBuilder().volumeId("test_vol_00000001").build();
-    CsiAdaptorClientProtocol mockedClient = new CsiAdaptorClientProtocol() {
-      @Override
-      public void validateVolume() {
-        validatedTimes.incrementAndGet();
-      }
-
-      @Override
-      public void controllerPublishVolume() {
-        // do nothing
-      }
-    };
+    CsiAdaptorProtocol mockedClient = Mockito.mock(CsiAdaptorProtocol.class);
     // The client has a count to memorize how many times being called
     volume.setClient(mockedClient);
 
     // NEW -> VALIDATED
+    doReturn(ValidateVolumeCapabilitiesResponse.newInstance(true, ""))
+        .when(mockedClient)
+        .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class));
     Assert.assertEquals(VolumeState.NEW, volume.getVolumeState());
     volume.handle(new ValidateVolumeEvent(volume));
     Assert.assertEquals(VolumeState.VALIDATED, volume.getVolumeState());
-    Assert.assertEquals(1, validatedTimes.get());
+    verify(mockedClient, times(1))
+        .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class));
 
     // VALIDATED -> VALIDATED
     volume.handle(new ValidateVolumeEvent(volume));
     Assert.assertEquals(VolumeState.VALIDATED, volume.getVolumeState());
-    Assert.assertEquals(1, validatedTimes.get());
+    verify(mockedClient, times(1))
+        .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class));
   }
 
   @Test
-  public void testUnavailableState() throws VolumeException {
+  public void testUnavailableState() throws YarnException, IOException {
     VolumeImpl volume = (VolumeImpl) VolumeBuilder
         .newBuilder().volumeId("test_vol_00000001").build();
-    CsiAdaptorClientProtocol mockedClient = Mockito
-        .mock(CsiAdaptorClientProtocol.class);
+    CsiAdaptorProtocol mockedClient = Mockito
+        .mock(CsiAdaptorProtocol.class);
     volume.setClient(mockedClient);
 
     // NEW -> UNAVAILABLE
-    doThrow(new VolumeException("failed")).when(mockedClient)
-        .validateVolume();
+    doThrow(new VolumeException("failed"))
+        .when(mockedClient)
+        .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class));
     Assert.assertEquals(VolumeState.NEW, volume.getVolumeState());
     volume.handle(new ValidateVolumeEvent(volume));
     Assert.assertEquals(VolumeState.UNAVAILABLE, volume.getVolumeState());
@@ -130,23 +157,26 @@ public class TestVolumeLifecycle {
     Assert.assertEquals(VolumeState.UNAVAILABLE, volume.getVolumeState());
 
     // UNAVAILABLE -> VALIDATED
-    doNothing().when(mockedClient).validateVolume();
+    doReturn(ValidateVolumeCapabilitiesResponse.newInstance(true, ""))
+        .when(mockedClient)
+        .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class));
     volume.setClient(mockedClient);
     volume.handle(new ValidateVolumeEvent(volume));
     Assert.assertEquals(VolumeState.VALIDATED, volume.getVolumeState());
   }
 
   @Test
-  public void testPublishUnavailableVolume() throws VolumeException {
+  public void testPublishUnavailableVolume() throws YarnException, IOException 
{
     VolumeImpl volume = (VolumeImpl) VolumeBuilder
         .newBuilder().volumeId("test_vol_00000001").build();
-    CsiAdaptorClientProtocol mockedClient = Mockito
-        .mock(CsiAdaptorClientProtocol.class);
+    CsiAdaptorProtocol mockedClient = Mockito
+        .mock(CsiAdaptorProtocol.class);
     volume.setClient(mockedClient);
 
     // NEW -> UNAVAILABLE (on validateVolume)
-    doThrow(new VolumeException("failed")).when(mockedClient)
-        .validateVolume();
+    doThrow(new VolumeException("failed"))
+        .when(mockedClient)
+        .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class));
     Assert.assertEquals(VolumeState.NEW, volume.getVolumeState());
     volume.handle(new ValidateVolumeEvent(volume));
     Assert.assertEquals(VolumeState.UNAVAILABLE, volume.getVolumeState());
@@ -154,7 +184,7 @@ public class TestVolumeLifecycle {
     // UNAVAILABLE -> UNAVAILABLE (on publishVolume)
     volume.handle(new ControllerPublishVolumeEvent(volume));
     // controller publish is not called since the state is UNAVAILABLE
-    verify(mockedClient, times(0)).controllerPublishVolume();
+    // verify(mockedClient, times(0)).controllerPublishVolume();
     // state remains to UNAVAILABLE
     Assert.assertEquals(VolumeState.UNAVAILABLE, volume.getVolumeState());
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5fb14e06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeProcessor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeProcessor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeProcessor.java
index d6f9d92..cee8fdf 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeProcessor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeProcessor.java
@@ -20,8 +20,11 @@ package 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
+import 
org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
+import 
org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.api.records.ResourceSizing;
@@ -40,7 +43,6 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
 import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume;
 import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeState;
 import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.processor.VolumeAMSProcessor;
-import org.apache.hadoop.yarn.server.volume.csi.CsiAdaptorClientProtocol;
 import org.apache.hadoop.yarn.server.volume.csi.CsiConstants;
 import org.apache.hadoop.yarn.server.volume.csi.VolumeId;
 import 
org.apache.hadoop.yarn.server.volume.csi.exception.InvalidVolumeException;
@@ -57,6 +59,10 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.util.Arrays;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+
 /**
  * Test cases for volume processor.
  */
@@ -91,6 +97,7 @@ public class TestVolumeProcessor {
     conf.set(CapacitySchedulerConfiguration.PREFIX
         + CapacitySchedulerConfiguration.ROOT + ".default.ordering-policy",
         "fair");
+    // this is required to enable volume processor
     conf.set(YarnConfiguration.RM_APPLICATION_MASTER_SERVICE_PROCESSORS,
         VolumeAMSProcessor.class.getName());
     mgr = new NullRMNodeLabelsManager();
@@ -155,6 +162,17 @@ public class TestVolumeProcessor {
         .schedulingRequests(Arrays.asList(sc))
         .build();
 
+    // inject adaptor client for testing
+    CsiAdaptorProtocol mockedClient = Mockito
+        .mock(CsiAdaptorProtocol.class);
+    rm.getRMContext().getVolumeManager()
+        .registerCsiDriverAdaptor("hostpath", mockedClient);
+
+    // simulate validation succeed
+    doReturn(ValidateVolumeCapabilitiesResponse.newInstance(true, ""))
+        .when(mockedClient)
+        .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class));
+
     am1.allocate(ar);
     VolumeStates volumeStates =
         rm.getRMContext().getVolumeManager().getVolumeStates();
@@ -212,12 +230,14 @@ public class TestVolumeProcessor {
     RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
     MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, mockNMS[0]);
 
-    CsiAdaptorClientProtocol mockedClient = Mockito
-        .mock(CsiAdaptorClientProtocol.class);
+    CsiAdaptorProtocol mockedClient = Mockito
+        .mock(CsiAdaptorProtocol.class);
     // inject adaptor client
-    rm.getRMContext().getVolumeManager().setClient(mockedClient);
-    Mockito.doThrow(new VolumeException("failed")).when(mockedClient)
-        .validateVolume();
+    rm.getRMContext().getVolumeManager()
+        .registerCsiDriverAdaptor("hostpath", mockedClient);
+    doThrow(new VolumeException("failed"))
+        .when(mockedClient)
+        .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class));
 
     Resource resource = Resource.newInstance(1024, 1);
     ResourceInformation volumeResource = ResourceInformation


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to