YARN-8902. [CSI] Add volume manager that manages CSI volume lifecycle. 
Contributed by Weiwei Yang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4e728444
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4e728444
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4e728444

Branch: refs/heads/HDFS-13891
Commit: 4e7284443e6cf8dac3cd5d2581730c87ae6ffb55
Parents: b5ec85d
Author: Sunil G <sun...@apache.org>
Authored: Mon Nov 12 11:57:02 2018 +0530
Committer: Sunil G <sun...@apache.org>
Committed: Mon Nov 12 11:57:02 2018 +0530

----------------------------------------------------------------------
 .../volume/csi/CsiAdaptorClientProtocol.java    |  34 +++
 .../yarn/server/volume/csi/CsiConstants.java    |  37 +++
 .../volume/csi/VolumeCapabilityRange.java       | 107 ++++++++
 .../hadoop/yarn/server/volume/csi/VolumeId.java |  59 +++++
 .../yarn/server/volume/csi/VolumeMetaData.java  | 227 +++++++++++++++++
 .../csi/exception/InvalidVolumeException.java   |  28 +++
 .../volume/csi/exception/VolumeException.java   |  34 +++
 .../exception/VolumeProvisioningException.java  |  32 +++
 .../volume/csi/exception/package-info.java      |  27 ++
 .../yarn/server/volume/csi/package-info.java    |  27 ++
 .../resourcemanager/RMActiveServiceContext.java |  14 ++
 .../yarn/server/resourcemanager/RMContext.java  |   5 +
 .../server/resourcemanager/RMContextImpl.java   |  12 +
 .../server/resourcemanager/ResourceManager.java |  14 ++
 .../volume/csi/CsiAdaptorClient.java            |  36 +++
 .../volume/csi/VolumeBuilder.java               | 106 ++++++++
 .../volume/csi/VolumeManager.java               |  63 +++++
 .../volume/csi/VolumeManagerImpl.java           | 108 ++++++++
 .../volume/csi/VolumeStates.java                |  60 +++++
 .../csi/event/ControllerPublishVolumeEvent.java |  30 +++
 .../volume/csi/event/ValidateVolumeEvent.java   |  30 +++
 .../volume/csi/event/VolumeEvent.java           |  43 ++++
 .../volume/csi/event/VolumeEventType.java       |  29 +++
 .../volume/csi/event/package-info.java          |  27 ++
 .../volume/csi/lifecycle/Volume.java            |  37 +++
 .../volume/csi/lifecycle/VolumeImpl.java        | 199 +++++++++++++++
 .../volume/csi/lifecycle/VolumeState.java       |  35 +++
 .../volume/csi/lifecycle/package-info.java      |  27 ++
 .../volume/csi/package-info.java                |  27 ++
 .../csi/processor/VolumeAMSProcessor.java       | 158 ++++++++++++
 .../volume/csi/processor/package-info.java      |  27 ++
 .../csi/provisioner/VolumeProvisioner.java      |  32 +++
 .../provisioner/VolumeProvisioningResults.java  |  87 +++++++
 .../csi/provisioner/VolumeProvisioningTask.java |  66 +++++
 .../volume/csi/provisioner/package-info.java    |  27 ++
 .../resourcemanager/volume/package-info.java    |  27 ++
 .../volume/csi/TestVolumeCapabilityRange.java   |  67 +++++
 .../volume/csi/TestVolumeLifecycle.java         | 161 ++++++++++++
 .../volume/csi/TestVolumeMetaData.java          | 178 +++++++++++++
 .../volume/csi/TestVolumeProcessor.java         | 250 +++++++++++++++++++
 40 files changed, 2594 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/CsiAdaptorClientProtocol.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/CsiAdaptorClientProtocol.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/CsiAdaptorClientProtocol.java
new file mode 100644
index 0000000..b894d4e
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/CsiAdaptorClientProtocol.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.volume.csi;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.server.volume.csi.exception.VolumeException;
+
+/**
+ * Protocol for the CSI adaptor.
+ */
+@Private
+@Unstable
+public interface CsiAdaptorClientProtocol {
+
+  void validateVolume() throws VolumeException;
+
+  void controllerPublishVolume() throws VolumeException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/CsiConstants.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/CsiConstants.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/CsiConstants.java
new file mode 100644
index 0000000..fcf9cf4
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/CsiConstants.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.volume.csi;
+
+/**
+ * CSI constants.
+ */
+public final class CsiConstants {
+
+  private CsiConstants() {
+    // Hide the constructor for this constant class.
+  }
+
+  public static final String CSI_VOLUME_NAME = "volume.name";
+  public static final String CSI_VOLUME_ID = "volume.id";
+  public static final String CSI_VOLUME_CAPABILITY = "volume.capability";
+  public static final String CSI_DRIVER_NAME = "driver.name";
+  public static final String CSI_VOLUME_MOUNT = "volume.mount";
+  public static final String CSI_VOLUME_ACCESS_MODE =  "volume.accessMode";
+
+  public static final String CSI_VOLUME_RESOURCE_TAG = "system:csi-volume";
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/VolumeCapabilityRange.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/VolumeCapabilityRange.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/VolumeCapabilityRange.java
new file mode 100644
index 0000000..e4775fe
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/VolumeCapabilityRange.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.volume.csi;
+
+import com.google.common.base.Strings;
+import 
org.apache.hadoop.yarn.server.volume.csi.exception.InvalidVolumeException;
+
+/**
+ * Volume capability range that specified in a volume resource request,
+ * this range defines the desired min/max capacity.
+ */
+public final class VolumeCapabilityRange {
+
+  private final long minCapacity;
+  private final long maxCapacity;
+  private final String unit;
+
+  private VolumeCapabilityRange(long minCapacity,
+      long maxCapacity, String unit) {
+    this.minCapacity = minCapacity;
+    this.maxCapacity = maxCapacity;
+    this.unit = unit;
+  }
+
+  public long getMinCapacity() {
+    return minCapacity;
+  }
+
+  public long getMaxCapacity() {
+    return maxCapacity;
+  }
+
+  public String getUnit() {
+    return unit;
+  }
+
+  @Override
+  public String toString() {
+    return "MinCapability: " + minCapacity + unit
+        + ", MaxCapability: " + maxCapacity + unit;
+  }
+
+  public static VolumeCapabilityBuilder newBuilder() {
+    return new VolumeCapabilityBuilder();
+  }
+
+  /**
+   * The builder used to build a VolumeCapabilityRange instance.
+   */
+  public static class VolumeCapabilityBuilder {
+    // An invalid default value implies this value must be set
+    private long minCap = -1L;
+    private long maxCap = Long.MAX_VALUE;
+    private String unit;
+
+    public VolumeCapabilityBuilder minCapacity(long minCapacity) {
+      this.minCap = minCapacity;
+      return this;
+    }
+
+    public VolumeCapabilityBuilder maxCapacity(long maxCapacity) {
+      this.maxCap = maxCapacity;
+      return this;
+    }
+
+    public VolumeCapabilityBuilder unit(String capacityUnit) {
+      this.unit = capacityUnit;
+      return this;
+    }
+
+    public VolumeCapabilityRange build() throws InvalidVolumeException {
+      VolumeCapabilityRange
+          capability = new VolumeCapabilityRange(minCap, maxCap, unit);
+      validateCapability(capability);
+      return capability;
+    }
+
+    private void validateCapability(VolumeCapabilityRange capability)
+        throws InvalidVolumeException {
+      if (capability.getMinCapacity() < 0) {
+        throw new InvalidVolumeException("Invalid volume capability range,"
+            + " minimal capability must not be less than 0. Capability: "
+            + capability.toString());
+      }
+      if (Strings.isNullOrEmpty(capability.getUnit())) {
+        throw new InvalidVolumeException("Invalid volume capability range,"
+            + " capability unit is missing. Capability: "
+            + capability.toString());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/VolumeId.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/VolumeId.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/VolumeId.java
new file mode 100644
index 0000000..8acc95e
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/VolumeId.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.volume.csi;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Unique ID for a volume. This may or may not come from a storage system,
+ * YARN depends on this ID to recognized volumes and manage their states.
+ */
+public class VolumeId {
+
+  private final String volumeId;
+
+  public VolumeId(String volumeId) {
+    this.volumeId = volumeId;
+  }
+
+  public String getId() {
+    return this.volumeId;
+  }
+
+  @Override
+  public String toString() {
+    return this.volumeId;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null || !(obj instanceof VolumeId)) {
+      return false;
+    }
+    return StringUtils.equalsIgnoreCase(volumeId,
+        ((VolumeId) obj).getId());
+  }
+
+  @Override
+  public int hashCode() {
+    HashCodeBuilder hc = new HashCodeBuilder();
+    hc.append(volumeId);
+    return hc.toHashCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/VolumeMetaData.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/VolumeMetaData.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/VolumeMetaData.java
new file mode 100644
index 0000000..7f2c92c
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/VolumeMetaData.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.volume.csi;
+
+import com.google.common.base.Strings;
+import com.google.gson.JsonObject;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import 
org.apache.hadoop.yarn.server.volume.csi.exception.InvalidVolumeException;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * VolumeMetaData defines all valid info for a CSI compatible volume.
+ */
+public class VolumeMetaData {
+
+  private VolumeId volumeId;
+  private String volumeName;
+  private VolumeCapabilityRange volumeCapabilityRange;
+  private String driverName;
+  private String mountPoint;
+
+  private void setVolumeId(VolumeId volumeId) {
+    this.volumeId = volumeId;
+  }
+
+  private void setVolumeName(String volumeName) {
+    this.volumeName = volumeName;
+  }
+
+  private void setVolumeCapabilityRange(VolumeCapabilityRange capability) {
+    this.volumeCapabilityRange = capability;
+  }
+
+  private void setDriverName(String driverName) {
+    this.driverName = driverName;
+  }
+
+  private void setMountPoint(String mountPoint) {
+    this.mountPoint = mountPoint;
+  }
+
+  public boolean isProvisionedVolume() {
+    return this.volumeId != null;
+  }
+
+  public VolumeId getVolumeId() {
+    return volumeId;
+  }
+
+  public String getVolumeName() {
+    return volumeName;
+  }
+
+  public VolumeCapabilityRange getVolumeCapabilityRange() {
+    return volumeCapabilityRange;
+  }
+
+  public String getDriverName() {
+    return driverName;
+  }
+
+  public String getMountPoint() {
+    return mountPoint;
+  }
+
+  public static VolumeSpecBuilder newBuilder() {
+    return new VolumeSpecBuilder();
+  }
+
+  public static List<VolumeMetaData> fromResource(
+      ResourceInformation resourceInfo) throws InvalidVolumeException {
+    List<VolumeMetaData> volumeMetaData = new ArrayList<>();
+    if (resourceInfo != null) {
+      if (resourceInfo.getTags() != null && resourceInfo.getTags()
+          .contains(CsiConstants.CSI_VOLUME_RESOURCE_TAG)) {
+        VolumeSpecBuilder builder = VolumeMetaData.newBuilder();
+        // Volume ID
+        if (resourceInfo.getAttributes()
+            .containsKey(CsiConstants.CSI_VOLUME_ID)) {
+          String id = resourceInfo.getAttributes()
+              .get(CsiConstants.CSI_VOLUME_ID);
+          builder.volumeId(new VolumeId(id));
+        }
+        // Volume name
+        if (resourceInfo.getAttributes()
+            .containsKey(CsiConstants.CSI_VOLUME_NAME)) {
+          builder.volumeName(resourceInfo.getAttributes()
+              .get(CsiConstants.CSI_VOLUME_NAME));
+        }
+        // CSI driver name
+        if (resourceInfo.getAttributes()
+            .containsKey(CsiConstants.CSI_DRIVER_NAME)) {
+          builder.driverName(resourceInfo.getAttributes()
+              .get(CsiConstants.CSI_DRIVER_NAME));
+        }
+        // Mount path
+        if (resourceInfo.getAttributes()
+            .containsKey(CsiConstants.CSI_VOLUME_MOUNT)) {
+          builder.mountPoint(resourceInfo.getAttributes()
+              .get(CsiConstants.CSI_VOLUME_MOUNT));
+        }
+        // Volume capability
+        VolumeCapabilityRange volumeCapabilityRange =
+            VolumeCapabilityRange.newBuilder()
+                .minCapacity(resourceInfo.getValue())
+                .unit(resourceInfo.getUnits())
+                .build();
+        builder.capability(volumeCapabilityRange);
+        volumeMetaData.add(builder.build());
+      }
+    }
+    return volumeMetaData;
+  }
+
+  @Override
+  public String toString() {
+    JsonObject json = new JsonObject();
+    if (!Strings.isNullOrEmpty(volumeName)) {
+      json.addProperty(CsiConstants.CSI_VOLUME_NAME, volumeName);
+    }
+    if (volumeId != null) {
+      json.addProperty(CsiConstants.CSI_VOLUME_ID, volumeId.toString());
+    }
+    if (volumeCapabilityRange != null) {
+      json.addProperty(CsiConstants.CSI_VOLUME_CAPABILITY,
+          volumeCapabilityRange.toString());
+    }
+    if (!Strings.isNullOrEmpty(driverName)) {
+      json.addProperty(CsiConstants.CSI_DRIVER_NAME, driverName);
+    }
+    if (!Strings.isNullOrEmpty(mountPoint)) {
+      json.addProperty(CsiConstants.CSI_VOLUME_MOUNT, mountPoint);
+    }
+    return json.toString();
+  }
+
+  /**
+   * The builder used to build a VolumeMetaData instance.
+   */
+  public static class VolumeSpecBuilder {
+    // @CreateVolumeRequest
+    // The suggested name for the storage space.
+    private VolumeId volumeId;
+    private String volumeName;
+    private VolumeCapabilityRange volumeCapabilityRange;
+    private String driverName;
+    private String mountPoint;
+
+    public VolumeSpecBuilder volumeId(VolumeId volumeId) {
+      this.volumeId = volumeId;
+      return this;
+    }
+
+    public VolumeSpecBuilder volumeName(String name) {
+      this.volumeName = name;
+      return this;
+    }
+
+    public VolumeSpecBuilder driverName(String driverName) {
+      this.driverName = driverName;
+      return this;
+    }
+
+    public VolumeSpecBuilder mountPoint(String mountPoint) {
+      this.mountPoint = mountPoint;
+      return this;
+    }
+
+    public VolumeSpecBuilder capability(VolumeCapabilityRange capability) {
+      this.volumeCapabilityRange = capability;
+      return this;
+    }
+
+    public VolumeMetaData build() throws InvalidVolumeException {
+      VolumeMetaData spec = new VolumeMetaData();
+      spec.setVolumeId(volumeId);
+      spec.setVolumeName(volumeName);
+      spec.setVolumeCapabilityRange(volumeCapabilityRange);
+      spec.setDriverName(driverName);
+      spec.setMountPoint(mountPoint);
+      validate(spec);
+      return spec;
+    }
+
+    private void validate(VolumeMetaData spec) throws InvalidVolumeException {
+      // Volume name OR Volume ID must be set
+      if (Strings.isNullOrEmpty(spec.getVolumeName())
+          && spec.getVolumeId() == null) {
+        throw new InvalidVolumeException("Invalid volume, both volume name"
+            + " and ID are missing from the spec. Volume spec: "
+            + spec.toString());
+      }
+      // Volume capability must be set
+      if (spec.getVolumeCapabilityRange() == null) {
+        throw new InvalidVolumeException("Invalid volume, volume capability"
+            + " is missing. Volume spec: " + spec.toString());
+      }
+      // CSI driver name must be set
+      if (Strings.isNullOrEmpty(spec.getDriverName())) {
+        throw new InvalidVolumeException("Invalid volume, the csi-driver name"
+            + " is missing. Volume spec: " + spec.toString());
+      }
+      // Mount point must be set
+      if (Strings.isNullOrEmpty(spec.getMountPoint())) {
+        throw new InvalidVolumeException("Invalid volume, the mount point"
+            + " is missing. Volume spec: " + spec.toString());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/exception/InvalidVolumeException.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/exception/InvalidVolumeException.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/exception/InvalidVolumeException.java
new file mode 100644
index 0000000..0559e8a
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/exception/InvalidVolumeException.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.volume.csi.exception;
+
+/**
+ * This exception is thrown when a volume is found not valid.
+ */
+public class InvalidVolumeException extends VolumeException {
+
+  public InvalidVolumeException(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/exception/VolumeException.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/exception/VolumeException.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/exception/VolumeException.java
new file mode 100644
index 0000000..60f9659
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/exception/VolumeException.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.volume.csi.exception;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * Base class for all volume related exceptions.
+ */
+public class VolumeException extends YarnException {
+
+  public VolumeException(String message) {
+    super(message);
+  }
+
+  public VolumeException(String message, Exception e) {
+    super(message, e);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/exception/VolumeProvisioningException.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/exception/VolumeProvisioningException.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/exception/VolumeProvisioningException.java
new file mode 100644
index 0000000..348eaf1
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/exception/VolumeProvisioningException.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.volume.csi.exception;
+
+/**
+ * Exception throws when volume provisioning is failed.
+ */
+public class VolumeProvisioningException extends VolumeException {
+
+  public VolumeProvisioningException(String message) {
+    super(message);
+  }
+
+  public VolumeProvisioningException(String message, Exception e) {
+    super(message, e);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/exception/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/exception/package-info.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/exception/package-info.java
new file mode 100644
index 0000000..40737f0
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/exception/package-info.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains volume related exception classes.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.volume.csi.exception;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/package-info.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/package-info.java
new file mode 100644
index 0000000..ef4ffef
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/volume/csi/package-info.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains common volume related classes.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.volume.csi;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
index 3562078..f829a4c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
@@ -56,6 +56,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManag
 import org.apache.hadoop.yarn.server.resourcemanager.security.ProxyCAManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.VolumeManager;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
 
@@ -121,6 +122,7 @@ public class RMActiveServiceContext {
   private MultiNodeSortingManager<SchedulerNode> multiNodeSortingManager;
 
   private ProxyCAManager proxyCAManager;
+  private VolumeManager volumeManager;
 
   public RMActiveServiceContext() {
     queuePlacementManager = new PlacementManager();
@@ -569,4 +571,16 @@ public class RMActiveServiceContext {
   public void setProxyCAManager(ProxyCAManager proxyCAManager) {
     this.proxyCAManager = proxyCAManager;
   }
+
+  @Private
+  @Unstable
+  public VolumeManager getVolumeManager() {
+    return this.volumeManager;
+  }
+
+  @Private
+  @Unstable
+  public void setVolumeManager(VolumeManager volumeManager) {
+    this.volumeManager = volumeManager;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
index f06befe..4e9846c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
@@ -56,6 +56,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.security.ProxyCAManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
+import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.VolumeManager;
 
 /**
  * Context of the ResourceManager.
@@ -193,4 +194,8 @@ public interface RMContext extends 
ApplicationMasterServiceContext {
   ProxyCAManager getProxyCAManager();
 
   void setProxyCAManager(ProxyCAManager proxyCAManager);
+
+  VolumeManager getVolumeManager();
+
+  void setVolumeManager(VolumeManager volumeManager);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index 48f74d3..ab71134 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -62,6 +62,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.security.ProxyCAManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
+import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.VolumeManager;
 import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
 import org.apache.hadoop.yarn.util.Clock;
 
@@ -648,6 +649,17 @@ public class RMContextImpl implements RMContext {
   public void setProxyCAManager(ProxyCAManager proxyCAManager) {
     this.activeServiceContext.setProxyCAManager(proxyCAManager);
   }
+
+  @Override
+  public VolumeManager getVolumeManager() {
+    return activeServiceContext.getVolumeManager();
+  }
+
+  @Override
+  public void setVolumeManager(VolumeManager volumeManager) {
+    this.activeServiceContext.setVolumeManager(volumeManager);
+  }
+
   // Note: Read java doc before adding any services over here.
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index a89069a..69d50f2 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -109,6 +109,9 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRen
 import org.apache.hadoop.yarn.server.resourcemanager.security.ProxyCAManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
+import org.apache.hadoop.yarn.server.resourcemanager.volume.csi.VolumeManager;
+import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.VolumeManagerImpl;
+import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.processor.VolumeAMSProcessor;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@@ -136,6 +139,7 @@ import java.nio.charset.Charset;
 import java.security.PrivilegedExceptionAction;
 import java.security.SecureRandom;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -845,6 +849,16 @@ public class ResourceManager extends CompositeService
         addIfService(systemServiceManager);
       }
 
+      // Add volume manager to RM context when it is necessary
+      String[] amsProcessorList = conf.getStrings(
+          YarnConfiguration.RM_APPLICATION_MASTER_SERVICE_PROCESSORS);
+      if (amsProcessorList != null&& Arrays.stream(amsProcessorList)
+          .anyMatch(s -> VolumeAMSProcessor.class.getName().equals(s))) {
+        VolumeManager volumeManager = new VolumeManagerImpl();
+        rmContext.setVolumeManager(volumeManager);
+        addIfService(volumeManager);
+      }
+
       super.serviceInit(conf);
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/CsiAdaptorClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/CsiAdaptorClient.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/CsiAdaptorClient.java
new file mode 100644
index 0000000..043e7ae
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/CsiAdaptorClient.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi;
+
+import org.apache.hadoop.yarn.server.volume.csi.exception.VolumeException;
+import org.apache.hadoop.yarn.server.volume.csi.CsiAdaptorClientProtocol;
+
+/**
+ * Client talks to CSI adaptor.
+ */
+public class CsiAdaptorClient implements CsiAdaptorClientProtocol {
+
+  @Override
+  public void validateVolume() throws VolumeException {
+    // TODO
+  }
+
+  @Override public void controllerPublishVolume() throws VolumeException {
+    // TODO
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeBuilder.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeBuilder.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeBuilder.java
new file mode 100644
index 0000000..af09373
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeBuilder.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi;
+
+import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume;
+import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeImpl;
+import org.apache.hadoop.yarn.server.volume.csi.VolumeCapabilityRange;
+import org.apache.hadoop.yarn.server.volume.csi.VolumeId;
+import org.apache.hadoop.yarn.server.volume.csi.VolumeMetaData;
+import 
org.apache.hadoop.yarn.server.volume.csi.exception.InvalidVolumeException;
+
+import java.util.Optional;
+import java.util.UUID;
+
+/**
+ * Helper class to build a {@link Volume}.
+ */
+public final class VolumeBuilder {
+
+  private String id;
+  private String name;
+  private Long min;
+  private Long max;
+  private String unit;
+  private String driver;
+  private String mount;
+
+  private VolumeBuilder() {
+    // hide constructor
+  }
+
+  public static VolumeBuilder newBuilder() {
+    return new VolumeBuilder();
+  }
+
+  public VolumeBuilder volumeId(String volumeId) {
+    this.id = volumeId;
+    return this;
+  }
+
+  public VolumeBuilder volumeName(String volumeName) {
+    this.name = volumeName;
+    return this;
+  }
+
+  public VolumeBuilder minCapability(long minCapability) {
+    this.min = Long.valueOf(minCapability);
+    return this;
+  }
+
+  public VolumeBuilder maxCapability(long maxCapability) {
+    this.max = Long.valueOf(maxCapability);
+    return this;
+  }
+
+  public VolumeBuilder unit(String capUnit) {
+    this.unit = capUnit;
+    return this;
+  }
+
+  public VolumeBuilder driverName(String driverName) {
+    this.driver = driverName;
+    return this;
+  }
+
+  public VolumeBuilder mountPoint(String mountPoint) {
+    this.mount = mountPoint;
+    return this;
+  }
+
+  public Volume build() throws InvalidVolumeException {
+    VolumeId vid = new VolumeId(
+        Optional.ofNullable(id)
+            .orElse(UUID.randomUUID().toString()));
+
+    VolumeCapabilityRange volumeCap = VolumeCapabilityRange.newBuilder()
+        .minCapacity(Optional.ofNullable(min).orElse(0L))
+        .maxCapacity(Optional.ofNullable(max).orElse(Long.MAX_VALUE))
+        .unit(Optional.ofNullable(unit).orElse("Gi"))
+        .build();
+
+    VolumeMetaData meta = VolumeMetaData.newBuilder()
+        .capability(volumeCap)
+        .driverName(Optional.ofNullable(driver).orElse("test-driver"))
+        .mountPoint(Optional.ofNullable(mount).orElse("/mnt/data"))
+        .volumeName(name)
+        .volumeId(vid)
+        .build();
+    return new VolumeImpl(meta);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManager.java
new file mode 100644
index 0000000..5f2669d
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManager.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume;
+import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner.VolumeProvisioningResults;
+import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner.VolumeProvisioningTask;
+import org.apache.hadoop.yarn.server.volume.csi.CsiAdaptorClientProtocol;
+
+import java.util.concurrent.ScheduledFuture;
+
+/**
+ * Main interface for volume manager that manages all volumes.
+ * Volume manager talks to a CSI controller plugin to handle the
+ * volume operations before it is available to be published on
+ * any node manager.
+ */
+@Private
+@Unstable
+public interface VolumeManager {
+
+  /**
+   * @return all known volumes and their states.
+   */
+  @VisibleForTesting
+  VolumeStates getVolumeStates();
+
+  @VisibleForTesting
+  void setClient(CsiAdaptorClientProtocol client);
+
+  /**
+   * Start to supervise on a volume.
+   * @param volume
+   * @return the volume being managed by the manager.
+   */
+  Volume addOrGetVolume(Volume volume);
+
+  /**
+   * Execute volume provisioning tasks as backend threads.
+   * @param volumeProvisioningTask
+   * @param delaySecond
+   */
+  ScheduledFuture<VolumeProvisioningResults> schedule(
+      VolumeProvisioningTask volumeProvisioningTask, int delaySecond);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManagerImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManagerImpl.java
new file mode 100644
index 0000000..5252f53
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeManagerImpl.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume;
+import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.VolumeImpl;
+import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner.VolumeProvisioningResults;
+import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.provisioner.VolumeProvisioningTask;
+import org.apache.hadoop.yarn.server.volume.csi.CsiAdaptorClientProtocol;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A service manages all volumes.
+ */
+public class VolumeManagerImpl extends AbstractService
+    implements VolumeManager {
+
+  private static final Log LOG = LogFactory.getLog(VolumeManagerImpl.class);
+
+  private final VolumeStates volumeStates;
+  private ScheduledExecutorService provisioningExecutor;
+  private CsiAdaptorClientProtocol adaptorClient;
+
+  private final static int PROVISIONING_TASK_THREAD_POOL_SIZE = 10;
+
+  public VolumeManagerImpl() {
+    super(VolumeManagerImpl.class.getName());
+    this.volumeStates = new VolumeStates();
+    this.provisioningExecutor = Executors
+        .newScheduledThreadPool(PROVISIONING_TASK_THREAD_POOL_SIZE);
+    this.adaptorClient = new CsiAdaptorClient();
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    provisioningExecutor.shutdown();
+    super.serviceStop();
+  }
+
+  @Override
+  public VolumeStates getVolumeStates() {
+    return this.volumeStates;
+  }
+
+  @Override
+  public Volume addOrGetVolume(Volume volume) {
+    if (volumeStates.getVolume(volume.getVolumeId()) != null) {
+      // volume already exists
+      return volumeStates.getVolume(volume.getVolumeId());
+    } else {
+      // add the volume and set the client
+      ((VolumeImpl) volume).setClient(adaptorClient);
+      this.volumeStates.addVolumeIfAbsent(volume);
+      return volume;
+    }
+  }
+
+  @VisibleForTesting
+  public void setClient(CsiAdaptorClientProtocol client) {
+    this.adaptorClient = client;
+  }
+
+  @Override
+  public ScheduledFuture<VolumeProvisioningResults> schedule(
+      VolumeProvisioningTask volumeProvisioningTask,
+      int delaySecond) {
+    LOG.info("Scheduling provision volume task (with delay "
+        + delaySecond + "s)," + " handling "
+        + volumeProvisioningTask.getVolumes().size()
+        + " volume provisioning");
+    return provisioningExecutor.schedule(volumeProvisioningTask,
+        delaySecond, TimeUnit.SECONDS);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeStates.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeStates.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeStates.java
new file mode 100644
index 0000000..fcef3f7
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/VolumeStates.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi;
+
+import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume;
+import org.apache.hadoop.yarn.server.volume.csi.VolumeId;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Volume manager states, including all managed volumes and their states.
+ */
+public class VolumeStates {
+
+  private final Map<VolumeId, Volume> volumeStates;
+
+  public VolumeStates() {
+    this.volumeStates = new ConcurrentHashMap<>();
+  }
+
+  public Volume getVolume(VolumeId volumeId) {
+    return volumeStates.get(volumeId);
+  }
+
+  /**
+   * Add volume if it is not yet added.
+   * If a new volume is added with a same {@link VolumeId}
+   * with a existing volume, existing volume will be returned.
+   * @param volume volume to add
+   * @return volume added or existing volume
+   */
+  public Volume addVolumeIfAbsent(Volume volume) {
+    if (volume.getVolumeId() != null) {
+      return volumeStates.putIfAbsent(volume.getVolumeId(), volume);
+    } else {
+      // for dynamical provisioned volumes,
+      // the volume ID might not be available at time being.
+      // we can makeup one with the combination of driver+volumeName+timestamp
+      // once the volume ID is generated, we should replace ID.
+      return volume;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/event/ControllerPublishVolumeEvent.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/event/ControllerPublishVolumeEvent.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/event/ControllerPublishVolumeEvent.java
new file mode 100644
index 0000000..3e294aa
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/event/ControllerPublishVolumeEvent.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event;
+
+import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume;
+
+/**
+ * Trigger controller publish.
+ */
+public class ControllerPublishVolumeEvent extends VolumeEvent {
+
+  public ControllerPublishVolumeEvent(Volume volume) {
+    super(volume, VolumeEventType.CONTROLLER_PUBLISH_VOLUME_EVENT);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/event/ValidateVolumeEvent.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/event/ValidateVolumeEvent.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/event/ValidateVolumeEvent.java
new file mode 100644
index 0000000..5e0c5e3
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/event/ValidateVolumeEvent.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event;
+
+import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume;
+
+/**
+ * Validate volume capability with the CSI driver.
+ */
+public class ValidateVolumeEvent extends VolumeEvent {
+
+  public ValidateVolumeEvent(Volume volume) {
+    super(volume, VolumeEventType.VALIDATE_VOLUME_EVENT);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/event/VolumeEvent.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/event/VolumeEvent.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/event/VolumeEvent.java
new file mode 100644
index 0000000..2a33887
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/event/VolumeEvent.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle.Volume;
+import org.apache.hadoop.yarn.server.volume.csi.VolumeId;
+
+/**
+ * Base volume event class that used to trigger volume state transitions.
+ */
+public class VolumeEvent extends AbstractEvent<VolumeEventType> {
+
+  private Volume volume;
+
+  public VolumeEvent(Volume volume, VolumeEventType volumeEventType) {
+    super(volumeEventType, System.currentTimeMillis());
+    this.volume = volume;
+  }
+
+  public Volume getVolume() {
+    return this.volume;
+  }
+
+  public VolumeId getVolumeId() {
+    return this.volume.getVolumeId();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/event/VolumeEventType.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/event/VolumeEventType.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/event/VolumeEventType.java
new file mode 100644
index 0000000..572e60d
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/event/VolumeEventType.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event;
+
+/**
+ * Volume events.
+ */
+public enum VolumeEventType {
+  VALIDATE_VOLUME_EVENT,
+  CREATE_VOLUME_EVENT,
+  CONTROLLER_PUBLISH_VOLUME_EVENT,
+  CONTROLLER_UNPUBLISH_VOLUME_EVENT,
+  DELETE_VOLUME
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/event/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/event/package-info.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/event/package-info.java
new file mode 100644
index 0000000..7d53281
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/event/package-info.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains volume related events.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/Volume.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/Volume.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/Volume.java
new file mode 100644
index 0000000..68e89b0
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/Volume.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.event.EventHandler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event.VolumeEvent;
+import org.apache.hadoop.yarn.server.volume.csi.VolumeId;
+
+/**
+ * Major volume interface at RM's view, it maintains the volume states and
+ * state transition according to the CSI volume lifecycle.
+ */
+@Private
+@Unstable
+public interface Volume extends EventHandler<VolumeEvent> {
+
+  VolumeState getVolumeState();
+
+  VolumeId getVolumeId();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeImpl.java
new file mode 100644
index 0000000..2515047
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeImpl.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.CsiAdaptorClient;
+import org.apache.hadoop.yarn.server.volume.csi.CsiAdaptorClientProtocol;
+import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event.VolumeEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.event.VolumeEventType;
+import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
+import org.apache.hadoop.yarn.state.MultipleArcTransition;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.hadoop.yarn.server.volume.csi.VolumeId;
+import org.apache.hadoop.yarn.server.volume.csi.VolumeMetaData;
+import org.apache.hadoop.yarn.server.volume.csi.exception.VolumeException;
+
+import java.util.EnumSet;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * This class maintains the volume states and state transition
+ * according to the CSI volume lifecycle. Volume states are stored in
+ * {@link 
org.apache.hadoop.yarn.server.resourcemanager.volume.csi.VolumeStates}
+ * class.
+ */
+public class VolumeImpl implements Volume {
+
+  private static final Log LOG = LogFactory.getLog(VolumeImpl.class);
+
+  private final Lock readLock;
+  private final Lock writeLock;
+  private final StateMachine<VolumeState, VolumeEventType, VolumeEvent>
+      stateMachine;
+
+  private final VolumeId volumeId;
+  private final VolumeMetaData volumeMeta;
+  private CsiAdaptorClientProtocol client;
+
+  public VolumeImpl(VolumeMetaData volumeMeta) {
+    ReadWriteLock lock = new ReentrantReadWriteLock();
+    this.writeLock = lock.writeLock();
+    this.readLock = lock.readLock();
+    this.volumeId = volumeMeta.getVolumeId();
+    this.volumeMeta = volumeMeta;
+    this.stateMachine = createVolumeStateFactory().make(this);
+    this.client = new CsiAdaptorClient();
+  }
+
+  @VisibleForTesting
+  public void setClient(CsiAdaptorClientProtocol client) {
+    this.client = client;
+  }
+
+  public CsiAdaptorClientProtocol getClient() {
+    return this.client;
+  }
+
+  private StateMachineFactory<VolumeImpl, VolumeState,
+      VolumeEventType, VolumeEvent> createVolumeStateFactory() {
+    return new StateMachineFactory<
+        VolumeImpl, VolumeState, VolumeEventType, VolumeEvent>(VolumeState.NEW)
+        .addTransition(
+            VolumeState.NEW,
+            EnumSet.of(VolumeState.VALIDATED, VolumeState.UNAVAILABLE),
+            VolumeEventType.VALIDATE_VOLUME_EVENT,
+            new ValidateVolumeTransition())
+        .addTransition(VolumeState.VALIDATED, VolumeState.VALIDATED,
+            VolumeEventType.VALIDATE_VOLUME_EVENT)
+        .addTransition(
+            VolumeState.VALIDATED,
+            EnumSet.of(VolumeState.NODE_READY, VolumeState.UNAVAILABLE),
+            VolumeEventType.CONTROLLER_PUBLISH_VOLUME_EVENT,
+            new ControllerPublishVolumeTransition())
+        .addTransition(
+            VolumeState.UNAVAILABLE,
+            EnumSet.of(VolumeState.UNAVAILABLE, VolumeState.VALIDATED),
+            VolumeEventType.VALIDATE_VOLUME_EVENT,
+            new ValidateVolumeTransition())
+        .addTransition(
+            VolumeState.UNAVAILABLE,
+            VolumeState.UNAVAILABLE,
+            EnumSet.of(VolumeEventType.CONTROLLER_PUBLISH_VOLUME_EVENT))
+        .addTransition(
+            VolumeState.NODE_READY,
+            VolumeState.NODE_READY,
+            EnumSet.of(VolumeEventType.CONTROLLER_PUBLISH_VOLUME_EVENT,
+                VolumeEventType.VALIDATE_VOLUME_EVENT))
+        .installTopology();
+  }
+
+  @Override
+  public VolumeState getVolumeState() {
+    try {
+      readLock.lock();
+      return stateMachine.getCurrentState();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public VolumeId getVolumeId() {
+    try {
+      readLock.lock();
+      return this.volumeId;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  private static class ValidateVolumeTransition
+      implements MultipleArcTransition<VolumeImpl, VolumeEvent, VolumeState> {
+    @Override
+    public VolumeState transition(VolumeImpl volume,
+        VolumeEvent volumeEvent) {
+      try {
+        // this call could cross node, we should keep the message tight
+        volume.getClient().validateVolume();
+        return VolumeState.VALIDATED;
+      } catch (VolumeException e) {
+        LOG.warn("Got exception while calling the CSI adaptor", e);
+        return VolumeState.UNAVAILABLE;
+      }
+    }
+  }
+
+  private static class ControllerPublishVolumeTransition
+      implements MultipleArcTransition<VolumeImpl, VolumeEvent, VolumeState> {
+
+    @Override
+    public VolumeState transition(VolumeImpl volume,
+        VolumeEvent volumeEvent) {
+      try {
+        // this call could cross node, we should keep the message tight
+        volume.getClient().controllerPublishVolume();
+        return VolumeState.NODE_READY;
+      } catch (VolumeException e) {
+        LOG.warn("Got exception while calling the CSI adaptor", e);
+        return volume.getVolumeState();
+      }
+    }
+  }
+
+  @Override
+  public void handle(VolumeEvent event) {
+    try {
+      this.writeLock.lock();
+      VolumeId volumeId = event.getVolumeId();
+
+      if (volumeId == null) {
+        // This should not happen, safely ignore the event
+        LOG.warn("Unexpected volume event received, event type is "
+            + event.getType().name() + ", but the volumeId is null.");
+        return;
+      }
+
+      LOG.info("Processing volume event, type=" + event.getType().name()
+          + ", volumeId=" + volumeId.toString());
+
+      VolumeState oldState = null;
+      VolumeState newState = null;
+      try {
+        oldState = stateMachine.getCurrentState();
+        newState = stateMachine.doTransition(event.getType(), event);
+      } catch (InvalidStateTransitionException e) {
+        LOG.warn("Can't handle this event at current state: Current: ["
+            + oldState + "], eventType: [" + event.getType() + "]," +
+            " volumeId: [" + volumeId + "]", e);
+      }
+
+      if (newState != null && oldState != newState) {
+        LOG.info("VolumeImpl " + volumeId + " transitioned from " + oldState
+            + " to " + newState);
+      }
+    }finally {
+      this.writeLock.unlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeState.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeState.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeState.java
new file mode 100644
index 0000000..9beb09a
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeState.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle;
+
+/**
+ * Volume states
+ * Volume states are defined in the CSI spec, see more in volume lifecycle.
+ */
+public enum VolumeState {
+  // initial state
+  NEW,
+  // volume capacity validated
+  VALIDATED,
+  // volume created by the controller
+  CREATED,
+  // controller published the volume
+  NODE_READY,
+  // unavailable
+  UNAVAILABLE
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/package-info.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/package-info.java
new file mode 100644
index 0000000..a9dd389
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/package-info.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains classes to manage volume lifecycle.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi.lifecycle;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4e728444/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/package-info.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/package-info.java
new file mode 100644
index 0000000..5d71617
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/package-info.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains classes to manage CSI volumes.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.resourcemanager.volume.csi;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to