http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/processor/VolumeAMSProcessor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/processor/VolumeAMSProcessor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/processor/VolumeAMSProcessor.java
new file mode 100644
index 0000000..f275768
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/processor/VolumeAMSProcessor.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.processor;
+
+import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext;
+import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import 
org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import 
org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.VolumeManager;
+import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume;
+import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeImpl;
+import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner.VolumeProvisioningResults;
+import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner.VolumeProvisioningTask;
+import org.apache.hadoop.yarn.server.volume.csi.VolumeMetaData;
+import 
org.apache.hadoop.yarn.server.volume.csi.exception.InvalidVolumeException;
+import org.apache.hadoop.yarn.server.volume.csi.exception.VolumeException;
+import 
org.apache.hadoop.yarn.server.volume.csi.exception.VolumeProvisioningException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * AMS processor that handles volume resource requests.
+ *
+ */
+public class VolumeAMSProcessor implements ApplicationMasterServiceProcessor {
+
+  private static final Logger LOG =  LoggerFactory
+      .getLogger(VolumeAMSProcessor.class);
+
+  private ApplicationMasterServiceProcessor nextAMSProcessor;
+  private VolumeManager volumeManager;
+
+  @Override
+  public void init(ApplicationMasterServiceContext amsContext,
+      ApplicationMasterServiceProcessor nextProcessor) {
+    LOG.info("Initializing CSI volume processor");
+    this.nextAMSProcessor = nextProcessor;
+    this.volumeManager = ((RMContext) amsContext).getVolumeManager();
+  }
+
+  @Override
+  public void registerApplicationMaster(
+      ApplicationAttemptId applicationAttemptId,
+      RegisterApplicationMasterRequest request,
+      RegisterApplicationMasterResponse response)
+      throws IOException, YarnException {
+    this.nextAMSProcessor.registerApplicationMaster(applicationAttemptId,
+        request, response);
+  }
+
+  @Override
+  public void allocate(ApplicationAttemptId appAttemptId,
+      AllocateRequest request, AllocateResponse response) throws YarnException 
{
+    List<Volume> volumes = aggregateVolumesFrom(request);
+    if (volumes != null && volumes.size() > 0) {
+      ScheduledFuture<VolumeProvisioningResults> result =
+          this.volumeManager.schedule(new VolumeProvisioningTask(volumes), 0);
+      try {
+        VolumeProvisioningResults volumeResult =
+            result.get(3, TimeUnit.SECONDS);
+        if (!volumeResult.isSuccess()) {
+          throw new VolumeProvisioningException("Volume provisioning failed,"
+              + " result details: " + volumeResult.getBriefMessage());
+        }
+      } catch (TimeoutException | InterruptedException | ExecutionException e) 
{
+        LOG.warn("Volume provisioning task failed", e);
+        throw new VolumeException("Volume provisioning task failed", e);
+      }
+    }
+
+    // Go to next processor
+    this.nextAMSProcessor.allocate(appAttemptId, request, response);
+  }
+
+  // Currently only scheduling request is supported.
+  private List<Volume> aggregateVolumesFrom(AllocateRequest request)
+      throws VolumeException {
+    List<Volume> volumeList = new ArrayList<>();
+    List<SchedulingRequest> requests = request.getSchedulingRequests();
+    if (requests != null) {
+      for (SchedulingRequest req : requests) {
+        Resource totalResource = req.getResourceSizing().getResources();
+        List<ResourceInformation> resourceList =
+            totalResource.getAllResourcesListCopy();
+        for (ResourceInformation resourceInformation : resourceList) {
+          List<VolumeMetaData> volumes =
+              VolumeMetaData.fromResource(resourceInformation);
+          for (VolumeMetaData vs : volumes) {
+            if (vs.getVolumeCapabilityRange().getMinCapacity() <= 0) {
+              // capacity not specified, ignore
+              continue;
+            } else if (vs.isProvisionedVolume()) {
+              volumeList.add(checkAndGetVolume(vs));
+            } else {
+              throw new InvalidVolumeException("Only pre-provisioned volume"
+                  + " is supported now, volumeID must exist.");
+            }
+          }
+        }
+      }
+    }
+    return volumeList;
+  }
+
+  /**
+   * If given volume ID already exists in the volume manager,
+   * it returns the existing volume. Otherwise, it creates a new
+   * volume and add that to volume manager.
+   * @param metaData
+   * @return volume
+   */
+  private Volume checkAndGetVolume(VolumeMetaData metaData) {
+    Volume toAdd = new VolumeImpl(metaData);
+    return this.volumeManager.addOrGetVolume(toAdd);
+  }
+
+  @Override
+  public void finishApplicationMaster(
+      ApplicationAttemptId applicationAttemptId,
+      FinishApplicationMasterRequest request,
+      FinishApplicationMasterResponse response) {
+    this.nextAMSProcessor.finishApplicationMaster(applicationAttemptId,
+        request, response);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/processor/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/processor/package-info.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/processor/package-info.java
new file mode 100644
index 0000000..788a417
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/processor/package-info.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains AMS processor class for volume handling.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.processor;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/VolumeProvisioner.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/VolumeProvisioner.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/VolumeProvisioner.java
new file mode 100644
index 0000000..47f0df2
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/VolumeProvisioner.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+import java.util.concurrent.Callable;
+
+/**
+ * A task interface to provision a volume to expected state.
+ */
+@Private
+@Unstable
+public interface VolumeProvisioner extends Callable<VolumeProvisioningResults> 
{
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/VolumeProvisioningResults.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/VolumeProvisioningResults.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/VolumeProvisioningResults.java
new file mode 100644
index 0000000..657fa6c
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/VolumeProvisioningResults.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner;
+
+import com.google.gson.JsonObject;
+import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeState;
+import org.apache.hadoop.yarn.server.volume.csi.VolumeId;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Result of volumes' provisioning.
+ */
+public class VolumeProvisioningResults {
+
+  private Map<VolumeId, VolumeProvisioningResult> resultMap;
+
+  public VolumeProvisioningResults() {
+    this.resultMap = new HashMap<>();
+  }
+
+  public boolean isSuccess() {
+    return !resultMap.isEmpty() && resultMap.values().stream()
+        .allMatch(subResult -> subResult.isSuccess());
+  }
+
+  public String getBriefMessage() {
+    JsonObject obj = new JsonObject();
+    obj.addProperty("TotalVolumes", resultMap.size());
+
+    JsonObject failed = new JsonObject();
+    for (VolumeProvisioningResult result : resultMap.values()) {
+      if (!result.isSuccess()) {
+        failed.addProperty(result.getVolumeId().toString(),
+            result.getVolumeState().name());
+      }
+    }
+    obj.add("failedVolumesStates", failed);
+    return obj.toString();
+  }
+
+  static class VolumeProvisioningResult {
+
+    private VolumeId volumeId;
+    private VolumeState volumeState;
+    private boolean success;
+
+    VolumeProvisioningResult(VolumeId volumeId, VolumeState state) {
+      this.volumeId = volumeId;
+      this.volumeState = state;
+      this.success = state == VolumeState.NODE_READY;
+    }
+
+    public boolean isSuccess() {
+      return this.success;
+    }
+
+    public VolumeId getVolumeId() {
+      return this.volumeId;
+    }
+
+    public VolumeState getVolumeState() {
+      return this.volumeState;
+    }
+  }
+
+  public void addResult(VolumeId volumeId, VolumeState state) {
+    this.resultMap.put(volumeId,
+        new VolumeProvisioningResult(volumeId, state));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/VolumeProvisioningTask.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/VolumeProvisioningTask.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/VolumeProvisioningTask.java
new file mode 100644
index 0000000..eb35431
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/VolumeProvisioningTask.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner;
+
+import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event.ControllerPublishVolumeEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event.ValidateVolumeEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * A provisioning task encapsulates all the logic required by a storage system
+ * to provision a volume. This class is the common implementation, it might
+ * be override if the provisioning behavior of a certain storage system
+ * is not completely align with this implementation.
+ */
+public class VolumeProvisioningTask implements VolumeProvisioner {
+
+  private static final Logger LOG =  LoggerFactory
+      .getLogger(VolumeProvisioningTask.class);
+
+  private List<Volume> volumes;
+
+  public VolumeProvisioningTask(List<Volume> volumes) {
+    this.volumes = volumes;
+  }
+
+  public List<Volume> getVolumes() {
+    return this.volumes;
+  }
+
+  @Override
+  public VolumeProvisioningResults call() throws Exception {
+    VolumeProvisioningResults vpr = new VolumeProvisioningResults();
+
+    // Wait all volumes are reaching expected state
+    for (Volume vs : volumes) {
+      LOG.info("Provisioning volume : {}", vs.getVolumeId().toString());
+      vs.handle(new ValidateVolumeEvent(vs));
+      vs.handle(new ControllerPublishVolumeEvent(vs));
+    }
+
+    // collect results
+    volumes.stream().forEach(v ->
+        vpr.addResult(v.getVolumeId(), v.getVolumeState()));
+
+    return vpr;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/package-info.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/package-info.java
new file mode 100644
index 0000000..92b4bdf
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/provisioner/package-info.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains the volume provisioning classes.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/package-info.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/package-info.java
new file mode 100644
index 0000000..3f9bc93
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/package-info.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains classes to manage storage volumes in YARN.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.resourcemanager.volume;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeCapabilityRange.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeCapabilityRange.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeCapabilityRange.java
new file mode 100644
index 0000000..5132864
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeCapabilityRange.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi;
+
+import 
org.apache.hadoop.yarn.server.volume.csi.exception.InvalidVolumeException;
+import org.apache.hadoop.yarn.server.volume.csi.VolumeCapabilityRange;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test cases for volume capability.
+ */
+public class TestVolumeCapabilityRange {
+
+  @Test(expected = InvalidVolumeException.class)
+  public void testInvalidMinCapability() throws InvalidVolumeException {
+    VolumeCapabilityRange.newBuilder()
+        .minCapacity(-1L)
+        .maxCapacity(5L)
+        .unit("Gi")
+        .build();
+  }
+
+  @Test(expected = InvalidVolumeException.class)
+  public void testMissingMinCapability() throws InvalidVolumeException {
+    VolumeCapabilityRange.newBuilder()
+        .maxCapacity(5L)
+        .unit("Gi")
+        .build();
+  }
+
+  @Test(expected = InvalidVolumeException.class)
+  public void testMissingUnit() throws InvalidVolumeException {
+    VolumeCapabilityRange.newBuilder()
+        .minCapacity(0L)
+        .maxCapacity(5L)
+        .build();
+  }
+
+  @Test
+  public void testGetVolumeCapability() throws InvalidVolumeException {
+    VolumeCapabilityRange vc = VolumeCapabilityRange.newBuilder()
+        .minCapacity(0L)
+        .maxCapacity(5L)
+        .unit("Gi")
+        .build();
+
+    Assert.assertEquals(0L, vc.getMinCapacity());
+    Assert.assertEquals(5L, vc.getMaxCapacity());
+    Assert.assertEquals("Gi", vc.getUnit());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeLifecycle.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeLifecycle.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeLifecycle.java
new file mode 100644
index 0000000..18c23e8
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeLifecycle.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi;
+
+import org.apache.hadoop.test.GenericTestUtils;
+import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event.ControllerPublishVolumeEvent;
+import org.apache.hadoop.yarn.server.volume.csi.CsiAdaptorClientProtocol;
+import 
org.apache.hadoop.yarn.server.volume.csi.exception.InvalidVolumeException;
+import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event.ValidateVolumeEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeImpl;
+import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeState;
+import org.apache.hadoop.yarn.server.volume.csi.exception.VolumeException;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.mockito.Mockito.*;
+
+/**
+ * Test cases for volume lifecycle management.
+ */
+public class TestVolumeLifecycle {
+
+  @Test
+  public void testValidation() throws InvalidVolumeException {
+    VolumeImpl volume = (VolumeImpl) VolumeBuilder.newBuilder()
+        .volumeId("test_vol_00000001")
+        .maxCapability(5L)
+        .unit("Gi")
+        .mountPoint("/path/to/mount")
+        .driverName("test-driver-name")
+        .build();
+    Assert.assertEquals(VolumeState.NEW, volume.getVolumeState());
+
+    volume.handle(new ValidateVolumeEvent(volume));
+    Assert.assertEquals(VolumeState.VALIDATED, volume.getVolumeState());
+  }
+
+  @Test
+  public void testValidationFailure() throws VolumeException {
+    VolumeImpl volume = (VolumeImpl) VolumeBuilder
+        .newBuilder().volumeId("test_vol_00000001").build();
+    CsiAdaptorClientProtocol mockedClient = Mockito
+        .mock(CsiAdaptorClientProtocol.class);
+    volume.setClient(mockedClient);
+
+    // NEW -> UNAVAILABLE
+    // Simulate a failed API call to the adaptor
+    doThrow(new VolumeException("failed")).when(mockedClient).validateVolume();
+    volume.handle(new ValidateVolumeEvent(volume));
+
+    try {
+      // Verify the countdown did not happen
+      GenericTestUtils.waitFor(() ->
+          volume.getVolumeState() == VolumeState.VALIDATED, 10, 50);
+      Assert.fail("Validate state not reached,"
+          + " it should keep waiting until timeout");
+    } catch (Exception e) {
+      Assert.assertTrue(e instanceof TimeoutException);
+      Assert.assertEquals(VolumeState.UNAVAILABLE, volume.getVolumeState());
+    }
+  }
+
+  @Test
+  public void testValidated() throws InvalidVolumeException {
+    AtomicInteger validatedTimes = new AtomicInteger(0);
+    VolumeImpl volume = (VolumeImpl) VolumeBuilder
+        .newBuilder().volumeId("test_vol_00000001").build();
+    CsiAdaptorClientProtocol mockedClient = new CsiAdaptorClientProtocol() {
+      @Override
+      public void validateVolume() {
+        validatedTimes.incrementAndGet();
+      }
+
+      @Override
+      public void controllerPublishVolume() {
+        // do nothing
+      }
+    };
+    // The client has a count to memorize how many times being called
+    volume.setClient(mockedClient);
+
+    // NEW -> VALIDATED
+    Assert.assertEquals(VolumeState.NEW, volume.getVolumeState());
+    volume.handle(new ValidateVolumeEvent(volume));
+    Assert.assertEquals(VolumeState.VALIDATED, volume.getVolumeState());
+    Assert.assertEquals(1, validatedTimes.get());
+
+    // VALIDATED -> VALIDATED
+    volume.handle(new ValidateVolumeEvent(volume));
+    Assert.assertEquals(VolumeState.VALIDATED, volume.getVolumeState());
+    Assert.assertEquals(1, validatedTimes.get());
+  }
+
+  @Test
+  public void testUnavailableState() throws VolumeException {
+    VolumeImpl volume = (VolumeImpl) VolumeBuilder
+        .newBuilder().volumeId("test_vol_00000001").build();
+    CsiAdaptorClientProtocol mockedClient = Mockito
+        .mock(CsiAdaptorClientProtocol.class);
+    volume.setClient(mockedClient);
+
+    // NEW -> UNAVAILABLE
+    doThrow(new VolumeException("failed")).when(mockedClient)
+        .validateVolume();
+    Assert.assertEquals(VolumeState.NEW, volume.getVolumeState());
+    volume.handle(new ValidateVolumeEvent(volume));
+    Assert.assertEquals(VolumeState.UNAVAILABLE, volume.getVolumeState());
+
+    // UNAVAILABLE -> UNAVAILABLE
+    volume.handle(new ValidateVolumeEvent(volume));
+    Assert.assertEquals(VolumeState.UNAVAILABLE, volume.getVolumeState());
+
+    // UNAVAILABLE -> VALIDATED
+    doNothing().when(mockedClient).validateVolume();
+    volume.setClient(mockedClient);
+    volume.handle(new ValidateVolumeEvent(volume));
+    Assert.assertEquals(VolumeState.VALIDATED, volume.getVolumeState());
+  }
+
+  @Test
+  public void testPublishUnavailableVolume() throws VolumeException {
+    VolumeImpl volume = (VolumeImpl) VolumeBuilder
+        .newBuilder().volumeId("test_vol_00000001").build();
+    CsiAdaptorClientProtocol mockedClient = Mockito
+        .mock(CsiAdaptorClientProtocol.class);
+    volume.setClient(mockedClient);
+
+    // NEW -> UNAVAILABLE (on validateVolume)
+    doThrow(new VolumeException("failed")).when(mockedClient)
+        .validateVolume();
+    Assert.assertEquals(VolumeState.NEW, volume.getVolumeState());
+    volume.handle(new ValidateVolumeEvent(volume));
+    Assert.assertEquals(VolumeState.UNAVAILABLE, volume.getVolumeState());
+
+    // UNAVAILABLE -> UNAVAILABLE (on publishVolume)
+    volume.handle(new ControllerPublishVolumeEvent(volume));
+    // controller publish is not called since the state is UNAVAILABLE
+    verify(mockedClient, times(0)).controllerPublishVolume();
+    // state remains to UNAVAILABLE
+    Assert.assertEquals(VolumeState.UNAVAILABLE, volume.getVolumeState());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeMetaData.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeMetaData.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeMetaData.java
new file mode 100644
index 0000000..38dbe03
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeMetaData.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import org.apache.hadoop.yarn.server.volume.csi.CsiConstants;
+import org.apache.hadoop.yarn.server.volume.csi.VolumeCapabilityRange;
+import org.apache.hadoop.yarn.server.volume.csi.VolumeMetaData;
+import 
org.apache.hadoop.yarn.server.volume.csi.exception.InvalidVolumeException;
+import org.apache.hadoop.yarn.server.volume.csi.VolumeId;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+
+/**
+ * Test cases for volume specification definition and parsing.
+ */
+public class TestVolumeMetaData {
+
+  @Test
+  public void testPreprovisionedVolume() throws InvalidVolumeException {
+    VolumeCapabilityRange cap = VolumeCapabilityRange.newBuilder()
+        .minCapacity(1L)
+        .maxCapacity(5L)
+        .unit("Gi")
+        .build();
+
+    // When volume id is given, volume name is optional
+    VolumeMetaData meta = VolumeMetaData.newBuilder()
+        .volumeId(new VolumeId("id-000001"))
+        .capability(cap)
+        .driverName("csi-demo-driver")
+        .mountPoint("/mnt/data")
+        .build();
+
+    Assert.assertEquals(new VolumeId("id-000001"), meta.getVolumeId());
+    Assert.assertEquals(1L, meta.getVolumeCapabilityRange().getMinCapacity());
+    Assert.assertEquals(5L, meta.getVolumeCapabilityRange().getMaxCapacity());
+    Assert.assertEquals("Gi", meta.getVolumeCapabilityRange().getUnit());
+    Assert.assertEquals("csi-demo-driver", meta.getDriverName());
+    Assert.assertEquals("/mnt/data", meta.getMountPoint());
+    Assert.assertNull(meta.getVolumeName());
+    Assert.assertTrue(meta.isProvisionedVolume());
+
+    // Test toString
+    JsonParser parser = new JsonParser();
+    JsonElement element = parser.parse(meta.toString());
+    JsonObject json = element.getAsJsonObject();
+    Assert.assertNotNull(json);
+    Assert.assertNull(json.get(CsiConstants.CSI_VOLUME_NAME));
+    Assert.assertEquals("id-000001",
+        json.get(CsiConstants.CSI_VOLUME_ID).getAsString());
+    Assert.assertEquals("csi-demo-driver",
+        json.get(CsiConstants.CSI_DRIVER_NAME).getAsString());
+    Assert.assertEquals("/mnt/data",
+        json.get(CsiConstants.CSI_VOLUME_MOUNT).getAsString());
+
+  }
+
+  @Test
+  public void testDynamicalProvisionedVolume() throws InvalidVolumeException {
+    VolumeCapabilityRange cap = VolumeCapabilityRange.newBuilder()
+        .minCapacity(1L)
+        .maxCapacity(5L)
+        .unit("Gi")
+        .build();
+
+    // When volume name is given, volume id is optional
+    VolumeMetaData meta = VolumeMetaData.newBuilder()
+        .volumeName("volume-name")
+        .capability(cap)
+        .driverName("csi-demo-driver")
+        .mountPoint("/mnt/data")
+        .build();
+    Assert.assertNotNull(meta);
+
+    Assert.assertEquals("volume-name", meta.getVolumeName());
+    Assert.assertEquals(1L, meta.getVolumeCapabilityRange().getMinCapacity());
+    Assert.assertEquals(5L, meta.getVolumeCapabilityRange().getMaxCapacity());
+    Assert.assertEquals("Gi", meta.getVolumeCapabilityRange().getUnit());
+    Assert.assertEquals("csi-demo-driver", meta.getDriverName());
+    Assert.assertEquals("/mnt/data", meta.getMountPoint());
+    Assert.assertFalse(meta.isProvisionedVolume());
+
+    // Test toString
+    JsonParser parser = new JsonParser();
+    JsonElement element = parser.parse(meta.toString());
+    JsonObject json = element.getAsJsonObject();
+    Assert.assertNotNull(json);
+    Assert.assertNull(json.get(CsiConstants.CSI_VOLUME_ID));
+    Assert.assertEquals("volume-name",
+        json.get(CsiConstants.CSI_VOLUME_NAME).getAsString());
+    Assert.assertEquals("csi-demo-driver",
+        json.get(CsiConstants.CSI_DRIVER_NAME).getAsString());
+    Assert.assertEquals("/mnt/data",
+        json.get(CsiConstants.CSI_VOLUME_MOUNT).getAsString());
+  }
+
+  @Test(expected = InvalidVolumeException.class)
+  public void testMissingMountpoint() throws InvalidVolumeException {
+    VolumeCapabilityRange cap = VolumeCapabilityRange.newBuilder()
+        .minCapacity(1L)
+        .maxCapacity(5L)
+        .unit("Gi")
+        .build();
+
+    VolumeMetaData.newBuilder()
+        .volumeId(new VolumeId("id-000001"))
+        .capability(cap)
+        .driverName("csi-demo-driver")
+        .build();
+  }
+
+
+  @Test(expected = InvalidVolumeException.class)
+  public void testMissingCsiDriverName() throws InvalidVolumeException {
+    VolumeCapabilityRange cap = VolumeCapabilityRange.newBuilder()
+        .minCapacity(1L)
+        .maxCapacity(5L)
+        .unit("Gi")
+        .build();
+
+    VolumeMetaData.newBuilder()
+        .volumeId(new VolumeId("id-000001"))
+        .capability(cap)
+        .mountPoint("/mnt/data")
+        .build();
+  }
+
+  @Test(expected = InvalidVolumeException.class)
+  public void testMissingVolumeCapability() throws InvalidVolumeException {
+    VolumeMetaData.newBuilder()
+        .volumeId(new VolumeId("id-000001"))
+        .driverName("csi-demo-driver")
+        .mountPoint("/mnt/data")
+        .build();
+  }
+
+  @Test
+  public void testVolumeId() {
+    VolumeId id1 = new VolumeId("test00001");
+    VolumeId id11 = new VolumeId("test00001");
+    VolumeId id2 = new VolumeId("test00002");
+
+    Assert.assertEquals(id1, id11);
+    Assert.assertEquals(id1.hashCode(), id11.hashCode());
+    Assert.assertNotEquals(id1, id2);
+
+    HashMap<VolumeId, String> map = new HashMap<>();
+    map.put(id1, "1");
+    Assert.assertEquals(1, map.size());
+    Assert.assertEquals("1", map.get(id11));
+    map.put(id11, "2");
+    Assert.assertEquals(1, map.size());
+    Assert.assertEquals("2", map.get(id11));
+    Assert.assertEquals("2", map.get(new VolumeId("test00001")));
+
+    Assert.assertNotEquals(id1, id2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeProcessor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeProcessor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeProcessor.java
new file mode 100644
index 0000000..d6f9d92
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeProcessor.java
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.api.records.ResourceSizing;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
+import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume;
+import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeState;
+import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.processor.VolumeAMSProcessor;
+import org.apache.hadoop.yarn.server.volume.csi.CsiAdaptorClientProtocol;
+import org.apache.hadoop.yarn.server.volume.csi.CsiConstants;
+import org.apache.hadoop.yarn.server.volume.csi.VolumeId;
+import 
org.apache.hadoop.yarn.server.volume.csi.exception.InvalidVolumeException;
+import org.apache.hadoop.yarn.server.volume.csi.exception.VolumeException;
+import 
org.apache.hadoop.yarn.server.volume.csi.exception.VolumeProvisioningException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * Test cases for volume processor.
+ */
+public class TestVolumeProcessor {
+
+  private static final int GB = 1024;
+  private YarnConfiguration conf;
+  private RMNodeLabelsManager mgr;
+  private MockRM rm;
+  private MockNM[] mockNMS;
+  private RMNode[] rmNodes;
+  private static final int NUM_OF_NMS = 4;
+  // resource-types.xml will be created under target/test-classes/ dir,
+  // it must be deleted after the test is done, to avoid it from reading
+  // by other UT classes.
+  private File resourceTypesFile = null;
+
+  private static final String VOLUME_RESOURCE_NAME = "yarn.io/csi-volume";
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new YarnConfiguration();
+    resourceTypesFile = new File(conf.getClassLoader()
+        .getResource(".").getPath(), "resource-types.xml");
+    writeTmpResourceTypesFile(resourceTypesFile);
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
+        YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
+    conf.set("yarn.scheduler.capacity.resource-calculator",
+        "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator");
+    conf.set(CapacitySchedulerConfiguration.PREFIX
+        + CapacitySchedulerConfiguration.ROOT + ".default.ordering-policy",
+        "fair");
+    conf.set(YarnConfiguration.RM_APPLICATION_MASTER_SERVICE_PROCESSORS,
+        VolumeAMSProcessor.class.getName());
+    mgr = new NullRMNodeLabelsManager();
+    mgr.init(conf);
+    rm = new MockRM(conf) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+    rm.getRMContext().setNodeLabelManager(mgr);
+    rm.start();
+    mockNMS = new MockNM[NUM_OF_NMS];
+    rmNodes = new RMNode[NUM_OF_NMS];
+    for (int i = 0; i < 4; i++) {
+      mockNMS[i] = rm.registerNode("192.168.0." + i + ":1234", 10 * GB);
+      rmNodes[i] = rm.getRMContext().getRMNodes().get(mockNMS[i].getNodeId());
+    }
+  }
+
+  @After
+  public void tearDown() {
+    if (resourceTypesFile != null && resourceTypesFile.exists()) {
+      resourceTypesFile.delete();
+    }
+  }
+
+  private void writeTmpResourceTypesFile(File tmpFile) throws IOException {
+    FileWriter fw = new FileWriter(tmpFile);
+    try {
+      Configuration yarnConf = new YarnConfiguration();
+      yarnConf.set(YarnConfiguration.RESOURCE_TYPES, VOLUME_RESOURCE_NAME);
+      yarnConf.set("yarn.resource-types."
+          + VOLUME_RESOURCE_NAME + ".units", "Mi");
+      yarnConf.writeXml(fw);
+    } finally {
+      fw.close();
+    }
+  }
+
+  @Test (timeout = 10000L)
+  public void testVolumeProvisioning() throws Exception {
+    RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, mockNMS[0]);
+    Resource resource = Resource.newInstance(1024, 1);
+    ResourceInformation volumeResource = ResourceInformation
+        .newInstance(VOLUME_RESOURCE_NAME, "Mi", 1024,
+            ResourceTypes.COUNTABLE, 0, Long.MAX_VALUE,
+            ImmutableSet.of(CsiConstants.CSI_VOLUME_RESOURCE_TAG),
+            ImmutableMap.of(
+                CsiConstants.CSI_VOLUME_ID, "test-vol-000001",
+                CsiConstants.CSI_DRIVER_NAME, "hostpath",
+                CsiConstants.CSI_VOLUME_MOUNT, "/mnt/data"
+            )
+        );
+    resource.setResourceInformation(VOLUME_RESOURCE_NAME, volumeResource);
+    SchedulingRequest sc = SchedulingRequest
+        .newBuilder().allocationRequestId(0L)
+        .resourceSizing(ResourceSizing.newInstance(1, resource))
+        .build();
+    AllocateRequest ar = AllocateRequest.newBuilder()
+        .schedulingRequests(Arrays.asList(sc))
+        .build();
+
+    am1.allocate(ar);
+    VolumeStates volumeStates =
+        rm.getRMContext().getVolumeManager().getVolumeStates();
+    Assert.assertNotNull(volumeStates);
+    VolumeState volumeState = VolumeState.NEW;
+    while (volumeState != VolumeState.NODE_READY) {
+      Volume volume = volumeStates
+          .getVolume(new VolumeId("test-vol-000001"));
+      if (volume != null) {
+        volumeState = volume.getVolumeState();
+      }
+      am1.doHeartbeat();
+      mockNMS[0].nodeHeartbeat(true);
+      Thread.sleep(500);
+    }
+    rm.stop();
+  }
+
+  @Test (timeout = 30000L)
+  public void testInvalidRequest() throws Exception {
+    RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, mockNMS[0]);
+    Resource resource = Resource.newInstance(1024, 1);
+    ResourceInformation volumeResource = ResourceInformation
+        .newInstance(VOLUME_RESOURCE_NAME, "Mi", 1024,
+            ResourceTypes.COUNTABLE, 0, Long.MAX_VALUE,
+            ImmutableSet.of(CsiConstants.CSI_VOLUME_RESOURCE_TAG),
+            ImmutableMap.of(
+                // volume ID is missing...
+                CsiConstants.CSI_VOLUME_NAME, "test-vol-000001",
+                CsiConstants.CSI_DRIVER_NAME, "hostpath",
+                CsiConstants.CSI_VOLUME_MOUNT, "/mnt/data"
+            )
+        );
+    resource.setResourceInformation(VOLUME_RESOURCE_NAME, volumeResource);
+    SchedulingRequest sc = SchedulingRequest
+        .newBuilder().allocationRequestId(0L)
+        .resourceSizing(ResourceSizing.newInstance(1, resource))
+        .build();
+    AllocateRequest ar = AllocateRequest.newBuilder()
+        .schedulingRequests(Arrays.asList(sc))
+        .build();
+
+    try {
+      am1.allocate(ar);
+      Assert.fail("allocate should fail because invalid request received");
+    } catch (Exception e) {
+      Assert.assertTrue(e instanceof InvalidVolumeException);
+    }
+    rm.stop();
+  }
+
+  @Test (timeout = 30000L)
+  public void testProvisioningFailures() throws Exception {
+    RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, mockNMS[0]);
+
+    CsiAdaptorClientProtocol mockedClient = Mockito
+        .mock(CsiAdaptorClientProtocol.class);
+    // inject adaptor client
+    rm.getRMContext().getVolumeManager().setClient(mockedClient);
+    Mockito.doThrow(new VolumeException("failed")).when(mockedClient)
+        .validateVolume();
+
+    Resource resource = Resource.newInstance(1024, 1);
+    ResourceInformation volumeResource = ResourceInformation
+        .newInstance(VOLUME_RESOURCE_NAME, "Mi", 1024,
+            ResourceTypes.COUNTABLE, 0, Long.MAX_VALUE,
+            ImmutableSet.of(CsiConstants.CSI_VOLUME_RESOURCE_TAG),
+            ImmutableMap.of(
+                CsiConstants.CSI_VOLUME_ID, "test-vol-000001",
+                CsiConstants.CSI_DRIVER_NAME, "hostpath",
+                CsiConstants.CSI_VOLUME_MOUNT, "/mnt/data"
+            )
+        );
+    resource.setResourceInformation(VOLUME_RESOURCE_NAME, volumeResource);
+    SchedulingRequest sc = SchedulingRequest
+        .newBuilder().allocationRequestId(0L)
+        .resourceSizing(ResourceSizing.newInstance(1, resource))
+        .build();
+    AllocateRequest ar = AllocateRequest.newBuilder()
+        .schedulingRequests(Arrays.asList(sc))
+        .build();
+
+    try {
+      am1.allocate(ar);
+      Assert.fail("allocate should fail");
+    } catch (Exception e) {
+      Assert.assertTrue(e instanceof VolumeProvisioningException);
+    }
+    rm.stop();
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to