Repository: hadoop Updated Branches: refs/heads/trunk 27ffec7ba -> 5fb14e063
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5fb14e06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestCsiAdaptorService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestCsiAdaptorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestCsiAdaptorService.java new file mode 100644 index 0000000..128240d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestCsiAdaptorService.java @@ -0,0 +1,256 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.csi.adaptor; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import csi.v0.Csi; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.ServiceStateException; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.api.CsiAdaptorProtocol; +import org.apache.hadoop.yarn.api.impl.pb.client.CsiAdaptorProtocolPBClientImpl; +import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesRequestPBImpl; +import org.apache.hadoop.yarn.client.NMProxy; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.csi.client.CsiClient; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.ServerSocket; + +import static org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest.AccessMode.MULTI_NODE_MULTI_WRITER; +import static org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest.VolumeType.FILE_SYSTEM; + +/** + * UT for {@link CsiAdaptorProtocolService}. + */ +public class TestCsiAdaptorService { + + private static File testRoot = null; + private static String domainSocket = null; + + @BeforeClass + public static void setUp() throws IOException { + testRoot = GenericTestUtils.getTestDir("csi-test"); + File socketPath = new File(testRoot, "csi.sock"); + FileUtils.forceMkdirParent(socketPath); + domainSocket = "unix://" + socketPath.getAbsolutePath(); + } + + @AfterClass + public static void tearDown() throws IOException { + if (testRoot != null) { + FileUtils.deleteDirectory(testRoot); + } + } + + @Test + public void testValidateVolume() throws IOException, YarnException { + ServerSocket ss = new ServerSocket(0); + ss.close(); + InetSocketAddress address = new InetSocketAddress(ss.getLocalPort()); + Configuration conf = new Configuration(); + conf.setSocketAddr( + YarnConfiguration.NM_CSI_ADAPTOR_PREFIX + "test-driver.address", + address); + CsiAdaptorProtocolService service = + new CsiAdaptorProtocolService("test-driver", domainSocket); + + // inject a fake CSI client + // this client validates if the ValidateVolumeCapabilitiesRequest + // is integrity, and then reply a fake response + service.setCsiClient(new CsiClient() { + @Override + public Csi.GetPluginInfoResponse getPluginInfo() { + return Csi.GetPluginInfoResponse.newBuilder() + .setName("test-plugin") + .setVendorVersion("0.1") + .build(); + } + + @Override + public Csi.ValidateVolumeCapabilitiesResponse validateVolumeCapabilities( + Csi.ValidateVolumeCapabilitiesRequest request) { + // validate we get all info from the request + Assert.assertEquals("volume-id-0000123", request.getVolumeId()); + Assert.assertEquals(1, request.getVolumeCapabilitiesCount()); + Assert.assertEquals(Csi.VolumeCapability.AccessMode + .newBuilder().setModeValue(5).build(), + request.getVolumeCapabilities(0).getAccessMode()); + Assert.assertTrue(request.getVolumeCapabilities(0).hasMount()); + Assert.assertEquals(2, request.getVolumeCapabilities(0) + .getMount().getMountFlagsCount()); + Assert.assertTrue(request.getVolumeCapabilities(0) + .getMount().getMountFlagsList().contains("mountFlag1")); + Assert.assertTrue(request.getVolumeCapabilities(0) + .getMount().getMountFlagsList().contains("mountFlag2")); + Assert.assertEquals(2, request.getVolumeAttributesCount()); + Assert.assertEquals("v1", request.getVolumeAttributesMap().get("k1")); + Assert.assertEquals("v2", request.getVolumeAttributesMap().get("k2")); + // return a fake result + return Csi.ValidateVolumeCapabilitiesResponse.newBuilder() + .setSupported(false) + .setMessage("this is a test") + .build(); + } + }); + + service.init(conf); + service.start(); + + try (CsiAdaptorProtocolPBClientImpl client = + new CsiAdaptorProtocolPBClientImpl(1L, address, new Configuration())) { + ValidateVolumeCapabilitiesRequest request = + ValidateVolumeCapabilitiesRequestPBImpl + .newInstance("volume-id-0000123", + ImmutableList.of( + new ValidateVolumeCapabilitiesRequest + .VolumeCapability( + MULTI_NODE_MULTI_WRITER, FILE_SYSTEM, + ImmutableList.of("mountFlag1", "mountFlag2"))), + ImmutableMap.of("k1", "v1", "k2", "v2")); + + ValidateVolumeCapabilitiesResponse response = client + .validateVolumeCapacity(request); + + Assert.assertEquals(false, response.isSupported()); + Assert.assertEquals("this is a test", response.getResponseMessage()); + } finally { + service.stop(); + } + } + + @Test + public void testValidateVolumeWithNMProxy() throws Exception { + ServerSocket ss = new ServerSocket(0); + ss.close(); + InetSocketAddress address = new InetSocketAddress(ss.getLocalPort()); + Configuration conf = new Configuration(); + conf.setSocketAddr( + YarnConfiguration.NM_CSI_ADAPTOR_PREFIX + "test-driver.address", + address); + CsiAdaptorProtocolService service = + new CsiAdaptorProtocolService("test-driver", domainSocket); + + // inject a fake CSI client + // this client validates if the ValidateVolumeCapabilitiesRequest + // is integrity, and then reply a fake response + service.setCsiClient(new CsiClient() { + @Override + public Csi.GetPluginInfoResponse getPluginInfo() { + return Csi.GetPluginInfoResponse.newBuilder() + .setName("test-plugin") + .setVendorVersion("0.1") + .build(); + } + + @Override + public Csi.ValidateVolumeCapabilitiesResponse validateVolumeCapabilities( + Csi.ValidateVolumeCapabilitiesRequest request) { + // validate we get all info from the request + Assert.assertEquals("volume-id-0000123", request.getVolumeId()); + Assert.assertEquals(1, request.getVolumeCapabilitiesCount()); + Assert.assertEquals(Csi.VolumeCapability.AccessMode + .newBuilder().setModeValue(5).build(), + request.getVolumeCapabilities(0).getAccessMode()); + Assert.assertTrue(request.getVolumeCapabilities(0).hasMount()); + Assert.assertEquals(2, request.getVolumeCapabilities(0) + .getMount().getMountFlagsCount()); + Assert.assertTrue(request.getVolumeCapabilities(0) + .getMount().getMountFlagsList().contains("mountFlag1")); + Assert.assertTrue(request.getVolumeCapabilities(0) + .getMount().getMountFlagsList().contains("mountFlag2")); + Assert.assertEquals(2, request.getVolumeAttributesCount()); + Assert.assertEquals("v1", request.getVolumeAttributesMap().get("k1")); + Assert.assertEquals("v2", request.getVolumeAttributesMap().get("k2")); + // return a fake result + return Csi.ValidateVolumeCapabilitiesResponse.newBuilder() + .setSupported(false) + .setMessage("this is a test") + .build(); + } + }); + + service.init(conf); + service.start(); + + YarnRPC rpc = YarnRPC.create(conf); + UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); + CsiAdaptorProtocol adaptorClient = NMProxy + .createNMProxy(conf, CsiAdaptorProtocol.class, currentUser, rpc, + NetUtils.createSocketAddrForHost("localhost", ss.getLocalPort())); + ValidateVolumeCapabilitiesRequest request = + ValidateVolumeCapabilitiesRequestPBImpl + .newInstance("volume-id-0000123", + ImmutableList.of(new ValidateVolumeCapabilitiesRequest + .VolumeCapability( + MULTI_NODE_MULTI_WRITER, FILE_SYSTEM, + ImmutableList.of("mountFlag1", "mountFlag2"))), + ImmutableMap.of("k1", "v1", "k2", "v2")); + + ValidateVolumeCapabilitiesResponse response = adaptorClient + .validateVolumeCapacity(request); + Assert.assertEquals(false, response.isSupported()); + Assert.assertEquals("this is a test", response.getResponseMessage()); + + service.stop(); + } + + @Test (expected = ServiceStateException.class) + public void testMissingConfiguration() { + Configuration conf = new Configuration(); + CsiAdaptorProtocolService service = + new CsiAdaptorProtocolService("test-driver", domainSocket); + service.init(conf); + } + + @Test (expected = ServiceStateException.class) + public void testInvalidServicePort() { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.NM_CSI_ADAPTOR_PREFIX + + "test-driver-0001.address", + "0.0.0.0:-100"); // this is an invalid address + CsiAdaptorProtocolService service = + new CsiAdaptorProtocolService("test-driver-0001", domainSocket); + service.init(conf); + } + + @Test (expected = ServiceStateException.class) + public void testInvalidHost() { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.NM_CSI_ADAPTOR_PREFIX + + "test-driver-0001.address", + "192.0.1:8999"); // this is an invalid ip address + CsiAdaptorProtocolService service = + new CsiAdaptorProtocolService("test-driver-0001", domainSocket); + service.init(conf); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5fb14e06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestGetPluginInfoRequestResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestGetPluginInfoRequestResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestGetPluginInfoRequestResponse.java new file mode 100644 index 0000000..f1734c8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestGetPluginInfoRequestResponse.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.csi.adaptor; + +import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetPluginInfoRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetPluginInfoResponsePBImpl; +import org.apache.hadoop.yarn.proto.CsiAdaptorProtos; +import org.junit.Assert; +import org.junit.Test; + +/** + * Verify the integrity of GetPluginInfoRequest and GetPluginInfoResponse. + */ +public class TestGetPluginInfoRequestResponse { + + @Test + public void testGetPluginInfoRequestPBRecord() { + CsiAdaptorProtos.GetPluginInfoRequest requestProto = + CsiAdaptorProtos.GetPluginInfoRequest.newBuilder().build(); + GetPluginInfoRequestPBImpl pbImpl = + new GetPluginInfoRequestPBImpl(requestProto); + Assert.assertNotNull(pbImpl); + Assert.assertEquals(requestProto, pbImpl.getProto()); + } + + @Test + public void testGetPluginInfoResponsePBRecord() { + CsiAdaptorProtos.GetPluginInfoResponse responseProto = + CsiAdaptorProtos.GetPluginInfoResponse.newBuilder() + .setName("test-driver") + .setVendorVersion("1.0.1") + .build(); + + GetPluginInfoResponsePBImpl pbImpl = + new GetPluginInfoResponsePBImpl(responseProto); + Assert.assertEquals("test-driver", pbImpl.getDriverName()); + Assert.assertEquals("1.0.1", pbImpl.getVersion()); + Assert.assertEquals(responseProto, pbImpl.getProto()); + + GetPluginInfoResponse pbImpl2 = GetPluginInfoResponsePBImpl + .newInstance("test-driver", "1.0.1"); + Assert.assertEquals("test-driver", pbImpl2.getDriverName()); + Assert.assertEquals("1.0.1", pbImpl2.getVersion()); + + CsiAdaptorProtos.GetPluginInfoResponse proto = + ((GetPluginInfoResponsePBImpl) pbImpl2).getProto(); + Assert.assertEquals("test-driver", proto.getName()); + Assert.assertEquals("1.0.1", proto.getVendorVersion()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/5fb14e06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestValidateVolumeCapabilityRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestValidateVolumeCapabilityRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestValidateVolumeCapabilityRequest.java new file mode 100644 index 0000000..303cfc4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestValidateVolumeCapabilityRequest.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.csi.adaptor; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest.VolumeCapability; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesRequestPBImpl; +import org.apache.hadoop.yarn.proto.CsiAdaptorProtos; +import org.apache.hadoop.yarn.proto.CsiAdaptorProtos.VolumeCapability.AccessMode; +import org.apache.hadoop.yarn.proto.CsiAdaptorProtos.VolumeCapability.VolumeType; +import org.apache.hadoop.yarn.proto.YarnProtos; +import org.junit.Assert; +import org.junit.Test; + +import static org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest.AccessMode.MULTI_NODE_MULTI_WRITER; +import static org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest.VolumeType.FILE_SYSTEM; + +/** + * UT for message exchanges. + */ +public class TestValidateVolumeCapabilityRequest { + + @Test + public void testPBRecord() { + CsiAdaptorProtos.VolumeCapability vcProto = + CsiAdaptorProtos.VolumeCapability.newBuilder() + .setAccessMode(AccessMode.MULTI_NODE_MULTI_WRITER) + .setVolumeType(VolumeType.FILE_SYSTEM) + .addMountFlags("flag0") + .addMountFlags("flag1") + .build(); + + CsiAdaptorProtos.ValidateVolumeCapabilitiesRequest requestProto = + CsiAdaptorProtos.ValidateVolumeCapabilitiesRequest.newBuilder() + .setVolumeId("volume-id-0000001") + .addVolumeCapabilities(vcProto) + .addVolumeAttributes(YarnProtos.StringStringMapProto + .newBuilder().setKey("attr0") + .setValue("value0") + .build()) + .addVolumeAttributes(YarnProtos.StringStringMapProto + .newBuilder().setKey("attr1") + .setValue("value1") + .build()) + .build(); + + ValidateVolumeCapabilitiesRequestPBImpl request = + new ValidateVolumeCapabilitiesRequestPBImpl(requestProto); + + Assert.assertEquals("volume-id-0000001", request.getVolumeId()); + Assert.assertEquals(2, request.getVolumeAttributes().size()); + Assert.assertEquals("value0", request.getVolumeAttributes().get("attr0")); + Assert.assertEquals("value1", request.getVolumeAttributes().get("attr1")); + Assert.assertEquals(1, request.getVolumeCapabilities().size()); + VolumeCapability vc = + request.getVolumeCapabilities().get(0); + Assert.assertEquals(MULTI_NODE_MULTI_WRITER, vc.getAccessMode()); + Assert.assertEquals(FILE_SYSTEM, vc.getVolumeType()); + Assert.assertEquals(2, vc.getMountFlags().size()); + + Assert.assertEquals(requestProto, request.getProto()); + } + + @Test + public void testNewInstance() { + ValidateVolumeCapabilitiesRequest pbImpl = + ValidateVolumeCapabilitiesRequestPBImpl + .newInstance("volume-id-0000123", + ImmutableList.of( + new VolumeCapability( + MULTI_NODE_MULTI_WRITER, FILE_SYSTEM, + ImmutableList.of("mountFlag1", "mountFlag2"))), + ImmutableMap.of("k1", "v1", "k2", "v2")); + + Assert.assertEquals("volume-id-0000123", pbImpl.getVolumeId()); + Assert.assertEquals(1, pbImpl.getVolumeCapabilities().size()); + Assert.assertEquals(FILE_SYSTEM, + pbImpl.getVolumeCapabilities().get(0).getVolumeType()); + Assert.assertEquals(MULTI_NODE_MULTI_WRITER, + pbImpl.getVolumeCapabilities().get(0).getAccessMode()); + Assert.assertEquals(2, pbImpl.getVolumeAttributes().size()); + + CsiAdaptorProtos.ValidateVolumeCapabilitiesRequest proto = + ((ValidateVolumeCapabilitiesRequestPBImpl) pbImpl).getProto(); + Assert.assertEquals("volume-id-0000123", proto.getVolumeId()); + Assert.assertEquals(1, proto.getVolumeCapabilitiesCount()); + Assert.assertEquals(AccessMode.MULTI_NODE_MULTI_WRITER, + proto.getVolumeCapabilities(0).getAccessMode()); + Assert.assertEquals(VolumeType.FILE_SYSTEM, + proto.getVolumeCapabilities(0).getVolumeType()); + Assert.assertEquals(2, proto.getVolumeCapabilities(0) + .getMountFlagsCount()); + Assert.assertEquals(2, proto.getVolumeCapabilities(0) + .getMountFlagsList().size()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5fb14e06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestValidateVolumeCapabilityResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestValidateVolumeCapabilityResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestValidateVolumeCapabilityResponse.java new file mode 100644 index 0000000..97f116a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestValidateVolumeCapabilityResponse.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.csi.adaptor; + +import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesResponsePBImpl; +import org.apache.hadoop.yarn.proto.CsiAdaptorProtos; +import org.junit.Assert; +import org.junit.Test; + +/** + * UT for message exchanges. + */ +public class TestValidateVolumeCapabilityResponse { + + @Test + public void testPBRecord() { + CsiAdaptorProtos.ValidateVolumeCapabilitiesResponse proto = + CsiAdaptorProtos.ValidateVolumeCapabilitiesResponse.newBuilder() + .setSupported(true) + .setMessage("capability is supported") + .build(); + + ValidateVolumeCapabilitiesResponsePBImpl pbImpl = + new ValidateVolumeCapabilitiesResponsePBImpl(proto); + + Assert.assertEquals(true, pbImpl.isSupported()); + Assert.assertEquals("capability is supported", pbImpl.getResponseMessage()); + Assert.assertEquals(proto, pbImpl.getProto()); + } + + @Test + public void testNewInstance() { + ValidateVolumeCapabilitiesResponse pbImpl = + ValidateVolumeCapabilitiesResponsePBImpl + .newInstance(false, "capability not supported"); + Assert.assertEquals(false, pbImpl.isSupported()); + Assert.assertEquals("capability not supported", + pbImpl.getResponseMessage()); + + CsiAdaptorProtos.ValidateVolumeCapabilitiesResponse proto = + ((ValidateVolumeCapabilitiesResponsePBImpl) pbImpl).getProto(); + Assert.assertEquals(false, proto.getSupported()); + Assert.assertEquals("capability not supported", proto.getMessage()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/5fb14e06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/package-info.java new file mode 100644 index 0000000..ecc7fc2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package contains UT classes for CSI adaptor. + */ +package org.apache.hadoop.yarn.csi.adaptor; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/5fb14e06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/CsiAdaptorClientProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/CsiAdaptorClientProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/CsiAdaptorClientProtocol.java deleted file mode 100644 index b894d4e..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/CsiAdaptorClientProtocol.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.volume.csi; - -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.yarn.server.volume.csi.exception.VolumeException; - -/** - * Protocol for the CSI adaptor. - */ -@Private -@Unstable -public interface CsiAdaptorClientProtocol { - - void validateVolume() throws VolumeException; - - void controllerPublishVolume() throws VolumeException; -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5fb14e06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/CsiAdaptorClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/CsiAdaptorClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/CsiAdaptorClient.java deleted file mode 100644 index 043e7ae..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/CsiAdaptorClient.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.resourcemanager.volume.csi; - -import org.apache.hadoop.yarn.server.volume.csi.exception.VolumeException; -import org.apache.hadoop.yarn.server.volume.csi.CsiAdaptorClientProtocol; - -/** - * Client talks to CSI adaptor. - */ -public class CsiAdaptorClient implements CsiAdaptorClientProtocol { - - @Override - public void validateVolume() throws VolumeException { - // TODO - } - - @Override public void controllerPublishVolume() throws VolumeException { - // TODO - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5fb14e06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManager.java index 5f2669d..32563cb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManager.java @@ -17,13 +17,12 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.volume.csi; -import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.CsiAdaptorProtocol; import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume; import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner.VolumeProvisioningResults; import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner.VolumeProvisioningTask; -import org.apache.hadoop.yarn.server.volume.csi.CsiAdaptorClientProtocol; import java.util.concurrent.ScheduledFuture; @@ -40,12 +39,8 @@ public interface VolumeManager { /** * @return all known volumes and their states. */ - @VisibleForTesting VolumeStates getVolumeStates(); - @VisibleForTesting - void setClient(CsiAdaptorClientProtocol client); - /** * Start to supervise on a volume. * @param volume @@ -60,4 +55,20 @@ public interface VolumeManager { */ ScheduledFuture<VolumeProvisioningResults> schedule( VolumeProvisioningTask volumeProvisioningTask, int delaySecond); + + /** + * Register a csi-driver-adaptor to the volume manager. + * @param driverName + * @param client + */ + void registerCsiDriverAdaptor(String driverName, CsiAdaptorProtocol client); + + /** + * Returns the csi-driver-adaptor client from cache by the given driver name. + * If the client is not found, null is returned. + * @param driverName + * @return a csi-driver-adaptor client working for given driver or null + * if the adaptor could not be found. + */ + CsiAdaptorProtocol getAdaptorByDriverName(String driverName); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5fb14e06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManagerImpl.java index 5252f53..839d1bc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManagerImpl.java @@ -18,16 +18,28 @@ package org.apache.hadoop.yarn.server.resourcemanager.volume.csi; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.CsiAdaptorProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse; +import org.apache.hadoop.yarn.client.NMProxy; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume; -import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeImpl; import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner.VolumeProvisioningResults; import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner.VolumeProvisioningTask; -import org.apache.hadoop.yarn.server.volume.csi.CsiAdaptorClientProtocol; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -43,20 +55,84 @@ public class VolumeManagerImpl extends AbstractService private final VolumeStates volumeStates; private ScheduledExecutorService provisioningExecutor; - private CsiAdaptorClientProtocol adaptorClient; + private Map<String, CsiAdaptorProtocol> csiAdaptorMap; private final static int PROVISIONING_TASK_THREAD_POOL_SIZE = 10; public VolumeManagerImpl() { super(VolumeManagerImpl.class.getName()); this.volumeStates = new VolumeStates(); + this.csiAdaptorMap = new ConcurrentHashMap<>(); this.provisioningExecutor = Executors .newScheduledThreadPool(PROVISIONING_TASK_THREAD_POOL_SIZE); - this.adaptorClient = new CsiAdaptorClient(); + } + + // Init the CSI adaptor cache according to the configuration. + // user only needs to configure a list of adaptor addresses, + // this method extracts each address and init an adaptor client, + // then proceed with a hand-shake by calling adaptor's getPluginInfo + // method to retrieve the driver info. If the driver can be resolved, + // it is then added to the cache. Note, we don't allow two drivers + // specified with same driver-name even version is different. + private void initCsiAdaptorCache( + final Map<String, CsiAdaptorProtocol> adaptorMap, Configuration conf) + throws IOException, YarnException { + LOG.info("Initializing cache for csi-driver-adaptors"); + String[] addresses = + conf.getStrings(YarnConfiguration.NM_CSI_ADAPTOR_ADDRESSES); + if (addresses != null && addresses.length > 0) { + for (String addr : addresses) { + LOG.info("Found csi-driver-adaptor socket address: " + addr); + InetSocketAddress address = NetUtils.createSocketAddr(addr); + YarnRPC rpc = YarnRPC.create(conf); + UserGroupInformation currentUser = + UserGroupInformation.getCurrentUser(); + CsiAdaptorProtocol adaptorClient = NMProxy + .createNMProxy(conf, CsiAdaptorProtocol.class, currentUser, rpc, + address); + // Attempt to resolve the driver by contacting to + // the diver's identity service on the given address. + // If the call failed, the initialization is also failed + // in order running into inconsistent state. + LOG.info("Retrieving info from csi-driver-adaptor on address " + addr); + GetPluginInfoResponse response = + adaptorClient.getPluginInfo(GetPluginInfoRequest.newInstance()); + if (!Strings.isNullOrEmpty(response.getDriverName())) { + String driverName = response.getDriverName(); + if (adaptorMap.containsKey(driverName)) { + throw new YarnException( + "Duplicate driver adaptor found," + " driver name: " + + driverName); + } + adaptorMap.put(driverName, adaptorClient); + LOG.info("CSI Adaptor added to the cache, adaptor name: " + driverName + + ", driver version: " + response.getVersion()); + } + } + } + } + + /** + * Returns a CsiAdaptorProtocol client by the given driver name, + * returns null if no adaptor is found for the driver, that means + * the driver has not registered to the volume manager yet enhance not valid. + * @param driverName the name of the driver + * @return CsiAdaptorProtocol client or null if driver not registered + */ + public CsiAdaptorProtocol getAdaptorByDriverName(String driverName) { + return csiAdaptorMap.get(driverName); + } + + @VisibleForTesting + @Override + public void registerCsiDriverAdaptor(String driverName, + CsiAdaptorProtocol client) { + this.csiAdaptorMap.put(driverName, client); } @Override protected void serviceInit(Configuration conf) throws Exception { + initCsiAdaptorCache(csiAdaptorMap, conf); super.serviceInit(conf); } @@ -82,18 +158,11 @@ public class VolumeManagerImpl extends AbstractService // volume already exists return volumeStates.getVolume(volume.getVolumeId()); } else { - // add the volume and set the client - ((VolumeImpl) volume).setClient(adaptorClient); this.volumeStates.addVolumeIfAbsent(volume); return volume; } } - @VisibleForTesting - public void setClient(CsiAdaptorClientProtocol client) { - this.adaptorClient = client; - } - @Override public ScheduledFuture<VolumeProvisioningResults> schedule( VolumeProvisioningTask volumeProvisioningTask, http://git-wip-us.apache.org/repos/asf/hadoop/blob/5fb14e06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/Volume.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/Volume.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/Volume.java index 68e89b0..83501ac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/Volume.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/Volume.java @@ -19,9 +19,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.CsiAdaptorProtocol; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event.VolumeEvent; import org.apache.hadoop.yarn.server.volume.csi.VolumeId; +import org.apache.hadoop.yarn.server.volume.csi.VolumeMetaData; /** * Major volume interface at RM's view, it maintains the volume states and @@ -34,4 +36,10 @@ public interface Volume extends EventHandler<VolumeEvent> { VolumeState getVolumeState(); VolumeId getVolumeId(); + + VolumeMetaData getVolumeMeta(); + + CsiAdaptorProtocol getClient(); + + void setClient(CsiAdaptorProtocol client); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5fb14e06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeImpl.java index 2515047..82a4acb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeImpl.java @@ -18,10 +18,15 @@ package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.CsiAdaptorClient; -import org.apache.hadoop.yarn.server.volume.csi.CsiAdaptorClientProtocol; +import org.apache.hadoop.yarn.api.CsiAdaptorProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest.VolumeCapability; +import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event.VolumeEvent; import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event.VolumeEventType; import org.apache.hadoop.yarn.state.InvalidStateTransitionException; @@ -30,13 +35,16 @@ import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.server.volume.csi.VolumeId; import org.apache.hadoop.yarn.server.volume.csi.VolumeMetaData; -import org.apache.hadoop.yarn.server.volume.csi.exception.VolumeException; +import java.io.IOException; import java.util.EnumSet; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import static org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest.AccessMode.SINGLE_NODE_READER_ONLY; +import static org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest.VolumeType.FILE_SYSTEM; + /** * This class maintains the volume states and state transition * according to the CSI volume lifecycle. Volume states are stored in @@ -54,7 +62,7 @@ public class VolumeImpl implements Volume { private final VolumeId volumeId; private final VolumeMetaData volumeMeta; - private CsiAdaptorClientProtocol client; + private CsiAdaptorProtocol adaptorClient; public VolumeImpl(VolumeMetaData volumeMeta) { ReadWriteLock lock = new ReentrantReadWriteLock(); @@ -63,16 +71,21 @@ public class VolumeImpl implements Volume { this.volumeId = volumeMeta.getVolumeId(); this.volumeMeta = volumeMeta; this.stateMachine = createVolumeStateFactory().make(this); - this.client = new CsiAdaptorClient(); } @VisibleForTesting - public void setClient(CsiAdaptorClientProtocol client) { - this.client = client; + public void setClient(CsiAdaptorProtocol csiAdaptorClient) { + this.adaptorClient = csiAdaptorClient; } - public CsiAdaptorClientProtocol getClient() { - return this.client; + @Override + public CsiAdaptorProtocol getClient() { + return this.adaptorClient; + } + + @Override + public VolumeMetaData getVolumeMeta() { + return this.volumeMeta; } private StateMachineFactory<VolumeImpl, VolumeState, @@ -135,9 +148,20 @@ public class VolumeImpl implements Volume { VolumeEvent volumeEvent) { try { // this call could cross node, we should keep the message tight - volume.getClient().validateVolume(); - return VolumeState.VALIDATED; - } catch (VolumeException e) { + // TODO we should parse the capability from volume resource spec + VolumeCapability capability = new VolumeCapability( + SINGLE_NODE_READER_ONLY, FILE_SYSTEM, + ImmutableList.of()); + ValidateVolumeCapabilitiesRequest request = + ValidateVolumeCapabilitiesRequest + .newInstance(volume.getVolumeId().getId(), + ImmutableList.of(capability), + ImmutableMap.of()); + ValidateVolumeCapabilitiesResponse response = volume.getClient() + .validateVolumeCapacity(request); + return response.isSupported() ? VolumeState.VALIDATED + : VolumeState.UNAVAILABLE; + } catch (YarnException | IOException e) { LOG.warn("Got exception while calling the CSI adaptor", e); return VolumeState.UNAVAILABLE; } @@ -150,14 +174,8 @@ public class VolumeImpl implements Volume { @Override public VolumeState transition(VolumeImpl volume, VolumeEvent volumeEvent) { - try { - // this call could cross node, we should keep the message tight - volume.getClient().controllerPublishVolume(); - return VolumeState.NODE_READY; - } catch (VolumeException e) { - LOG.warn("Got exception while calling the CSI adaptor", e); - return volume.getVolumeState(); - } + // this call could cross node, we should keep the message tight + return VolumeState.NODE_READY; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5fb14e06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/processor/VolumeAMSProcessor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/processor/VolumeAMSProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/processor/VolumeAMSProcessor.java index f275768..814634a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/processor/VolumeAMSProcessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/processor/VolumeAMSProcessor.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.processor; import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext; import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor; +import org.apache.hadoop.yarn.api.CsiAdaptorProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; @@ -29,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.VolumeManager; @@ -142,8 +144,21 @@ public class VolumeAMSProcessor implements ApplicationMasterServiceProcessor { * @param metaData * @return volume */ - private Volume checkAndGetVolume(VolumeMetaData metaData) { + private Volume checkAndGetVolume(VolumeMetaData metaData) + throws InvalidVolumeException { Volume toAdd = new VolumeImpl(metaData); + CsiAdaptorProtocol adaptor = volumeManager + .getAdaptorByDriverName(metaData.getDriverName()); + if (adaptor == null) { + throw new InvalidVolumeException("It seems for the driver name" + + " specified in the volume " + metaData.getDriverName() + + " ,there is no matched driver-adaptor can be found. " + + "Is the driver probably registered? Please check if" + + " adaptors service addresses defined in " + + YarnConfiguration.NM_CSI_ADAPTOR_ADDRESSES + + " are correct and services are started."); + } + toAdd.setClient(adaptor); return this.volumeManager.addOrGetVolume(toAdd); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5fb14e06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeLifecycle.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeLifecycle.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeLifecycle.java index 18c23e8..a94f508 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeLifecycle.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeLifecycle.java @@ -18,9 +18,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.volume.csi; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.api.CsiAdaptorProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event.ControllerPublishVolumeEvent; -import org.apache.hadoop.yarn.server.volume.csi.CsiAdaptorClientProtocol; -import org.apache.hadoop.yarn.server.volume.csi.exception.InvalidVolumeException; import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event.ValidateVolumeEvent; import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeImpl; import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeState; @@ -29,8 +31,8 @@ import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; +import java.io.IOException; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; import static org.mockito.Mockito.*; @@ -40,7 +42,13 @@ import static org.mockito.Mockito.*; public class TestVolumeLifecycle { @Test - public void testValidation() throws InvalidVolumeException { + public void testValidation() throws YarnException, IOException { + CsiAdaptorProtocol mockedClient = Mockito + .mock(CsiAdaptorProtocol.class); + doReturn(ValidateVolumeCapabilitiesResponse.newInstance(true, "")) + .when(mockedClient) + .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class)); + VolumeImpl volume = (VolumeImpl) VolumeBuilder.newBuilder() .volumeId("test_vol_00000001") .maxCapability(5L) @@ -48,6 +56,7 @@ public class TestVolumeLifecycle { .mountPoint("/path/to/mount") .driverName("test-driver-name") .build(); + volume.setClient(mockedClient); Assert.assertEquals(VolumeState.NEW, volume.getVolumeState()); volume.handle(new ValidateVolumeEvent(volume)); @@ -55,16 +64,19 @@ public class TestVolumeLifecycle { } @Test - public void testValidationFailure() throws VolumeException { + public void testVolumeCapacityNotSupported() throws Exception { + CsiAdaptorProtocol mockedClient = Mockito + .mock(CsiAdaptorProtocol.class); + VolumeImpl volume = (VolumeImpl) VolumeBuilder .newBuilder().volumeId("test_vol_00000001").build(); - CsiAdaptorClientProtocol mockedClient = Mockito - .mock(CsiAdaptorClientProtocol.class); volume.setClient(mockedClient); // NEW -> UNAVAILABLE // Simulate a failed API call to the adaptor - doThrow(new VolumeException("failed")).when(mockedClient).validateVolume(); + doReturn(ValidateVolumeCapabilitiesResponse.newInstance(false, "")) + .when(mockedClient) + .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class)); volume.handle(new ValidateVolumeEvent(volume)); try { @@ -80,47 +92,62 @@ public class TestVolumeLifecycle { } @Test - public void testValidated() throws InvalidVolumeException { - AtomicInteger validatedTimes = new AtomicInteger(0); + public void testValidationFailure() throws YarnException, IOException { + CsiAdaptorProtocol mockedClient = Mockito + .mock(CsiAdaptorProtocol.class); + doThrow(new VolumeException("fail")) + .when(mockedClient) + .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class)); + + VolumeImpl volume = (VolumeImpl) VolumeBuilder + .newBuilder().volumeId("test_vol_00000001").build(); + volume.setClient(mockedClient); + + // NEW -> UNAVAILABLE + // Simulate a failed API call to the adaptor + doThrow(new VolumeException("failed")) + .when(mockedClient) + .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class)); + volume.handle(new ValidateVolumeEvent(volume)); + } + + @Test + public void testValidated() throws YarnException, IOException { VolumeImpl volume = (VolumeImpl) VolumeBuilder .newBuilder().volumeId("test_vol_00000001").build(); - CsiAdaptorClientProtocol mockedClient = new CsiAdaptorClientProtocol() { - @Override - public void validateVolume() { - validatedTimes.incrementAndGet(); - } - - @Override - public void controllerPublishVolume() { - // do nothing - } - }; + CsiAdaptorProtocol mockedClient = Mockito.mock(CsiAdaptorProtocol.class); // The client has a count to memorize how many times being called volume.setClient(mockedClient); // NEW -> VALIDATED + doReturn(ValidateVolumeCapabilitiesResponse.newInstance(true, "")) + .when(mockedClient) + .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class)); Assert.assertEquals(VolumeState.NEW, volume.getVolumeState()); volume.handle(new ValidateVolumeEvent(volume)); Assert.assertEquals(VolumeState.VALIDATED, volume.getVolumeState()); - Assert.assertEquals(1, validatedTimes.get()); + verify(mockedClient, times(1)) + .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class)); // VALIDATED -> VALIDATED volume.handle(new ValidateVolumeEvent(volume)); Assert.assertEquals(VolumeState.VALIDATED, volume.getVolumeState()); - Assert.assertEquals(1, validatedTimes.get()); + verify(mockedClient, times(1)) + .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class)); } @Test - public void testUnavailableState() throws VolumeException { + public void testUnavailableState() throws YarnException, IOException { VolumeImpl volume = (VolumeImpl) VolumeBuilder .newBuilder().volumeId("test_vol_00000001").build(); - CsiAdaptorClientProtocol mockedClient = Mockito - .mock(CsiAdaptorClientProtocol.class); + CsiAdaptorProtocol mockedClient = Mockito + .mock(CsiAdaptorProtocol.class); volume.setClient(mockedClient); // NEW -> UNAVAILABLE - doThrow(new VolumeException("failed")).when(mockedClient) - .validateVolume(); + doThrow(new VolumeException("failed")) + .when(mockedClient) + .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class)); Assert.assertEquals(VolumeState.NEW, volume.getVolumeState()); volume.handle(new ValidateVolumeEvent(volume)); Assert.assertEquals(VolumeState.UNAVAILABLE, volume.getVolumeState()); @@ -130,23 +157,26 @@ public class TestVolumeLifecycle { Assert.assertEquals(VolumeState.UNAVAILABLE, volume.getVolumeState()); // UNAVAILABLE -> VALIDATED - doNothing().when(mockedClient).validateVolume(); + doReturn(ValidateVolumeCapabilitiesResponse.newInstance(true, "")) + .when(mockedClient) + .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class)); volume.setClient(mockedClient); volume.handle(new ValidateVolumeEvent(volume)); Assert.assertEquals(VolumeState.VALIDATED, volume.getVolumeState()); } @Test - public void testPublishUnavailableVolume() throws VolumeException { + public void testPublishUnavailableVolume() throws YarnException, IOException { VolumeImpl volume = (VolumeImpl) VolumeBuilder .newBuilder().volumeId("test_vol_00000001").build(); - CsiAdaptorClientProtocol mockedClient = Mockito - .mock(CsiAdaptorClientProtocol.class); + CsiAdaptorProtocol mockedClient = Mockito + .mock(CsiAdaptorProtocol.class); volume.setClient(mockedClient); // NEW -> UNAVAILABLE (on validateVolume) - doThrow(new VolumeException("failed")).when(mockedClient) - .validateVolume(); + doThrow(new VolumeException("failed")) + .when(mockedClient) + .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class)); Assert.assertEquals(VolumeState.NEW, volume.getVolumeState()); volume.handle(new ValidateVolumeEvent(volume)); Assert.assertEquals(VolumeState.UNAVAILABLE, volume.getVolumeState()); @@ -154,7 +184,7 @@ public class TestVolumeLifecycle { // UNAVAILABLE -> UNAVAILABLE (on publishVolume) volume.handle(new ControllerPublishVolumeEvent(volume)); // controller publish is not called since the state is UNAVAILABLE - verify(mockedClient, times(0)).controllerPublishVolume(); + // verify(mockedClient, times(0)).controllerPublishVolume(); // state remains to UNAVAILABLE Assert.assertEquals(VolumeState.UNAVAILABLE, volume.getVolumeState()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5fb14e06/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeProcessor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeProcessor.java index d6f9d92..cee8fdf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeProcessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeProcessor.java @@ -20,8 +20,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.volume.csi; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.CsiAdaptorProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes; +import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.ResourceSizing; @@ -40,7 +43,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume; import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeState; import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.processor.VolumeAMSProcessor; -import org.apache.hadoop.yarn.server.volume.csi.CsiAdaptorClientProtocol; import org.apache.hadoop.yarn.server.volume.csi.CsiConstants; import org.apache.hadoop.yarn.server.volume.csi.VolumeId; import org.apache.hadoop.yarn.server.volume.csi.exception.InvalidVolumeException; @@ -57,6 +59,10 @@ import java.io.FileWriter; import java.io.IOException; import java.util.Arrays; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; + /** * Test cases for volume processor. */ @@ -91,6 +97,7 @@ public class TestVolumeProcessor { conf.set(CapacitySchedulerConfiguration.PREFIX + CapacitySchedulerConfiguration.ROOT + ".default.ordering-policy", "fair"); + // this is required to enable volume processor conf.set(YarnConfiguration.RM_APPLICATION_MASTER_SERVICE_PROCESSORS, VolumeAMSProcessor.class.getName()); mgr = new NullRMNodeLabelsManager(); @@ -155,6 +162,17 @@ public class TestVolumeProcessor { .schedulingRequests(Arrays.asList(sc)) .build(); + // inject adaptor client for testing + CsiAdaptorProtocol mockedClient = Mockito + .mock(CsiAdaptorProtocol.class); + rm.getRMContext().getVolumeManager() + .registerCsiDriverAdaptor("hostpath", mockedClient); + + // simulate validation succeed + doReturn(ValidateVolumeCapabilitiesResponse.newInstance(true, "")) + .when(mockedClient) + .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class)); + am1.allocate(ar); VolumeStates volumeStates = rm.getRMContext().getVolumeManager().getVolumeStates(); @@ -212,12 +230,14 @@ public class TestVolumeProcessor { RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, mockNMS[0]); - CsiAdaptorClientProtocol mockedClient = Mockito - .mock(CsiAdaptorClientProtocol.class); + CsiAdaptorProtocol mockedClient = Mockito + .mock(CsiAdaptorProtocol.class); // inject adaptor client - rm.getRMContext().getVolumeManager().setClient(mockedClient); - Mockito.doThrow(new VolumeException("failed")).when(mockedClient) - .validateVolume(); + rm.getRMContext().getVolumeManager() + .registerCsiDriverAdaptor("hostpath", mockedClient); + doThrow(new VolumeException("failed")) + .when(mockedClient) + .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class)); Resource resource = Resource.newInstance(1024, 1); ResourceInformation volumeResource = ResourceInformation --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org