This is an automated email from the ASF dual-hosted git repository.
sunilg pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new f4906ac YARN-9038. [CSI] Add ability to publish/unpublish volumes on
node managers. Contributed by Weiwei Yang.
f4906ac is described below
commit f4906ac01960c78ff0c91797eaa4b36d80f6826d
Author: Sunil G <[email protected]>
AuthorDate: Fri Jan 4 12:10:00 2019 +0530
YARN-9038. [CSI] Add ability to publish/unpublish volumes on node managers.
Contributed by Weiwei Yang.
---
.../apache/hadoop/yarn/api/CsiAdaptorProtocol.java | 44 +++++
.../protocolrecords/NodePublishVolumeRequest.java | 94 ++++++++++
.../NodePublishVolumeResponse.java} | 23 +--
.../NodeUnpublishVolumeRequest.java} | 32 ++--
.../NodeUnpublishVolumeResponse.java} | 23 +--
.../yarn/api/records/ResourceInformation.java | 4 +-
.../apache/hadoop/yarn/conf/YarnConfiguration.java | 19 +-
.../hadoop/yarn/util/csi/CsiConfigUtils.java} | 28 ++-
.../apache/hadoop/yarn/util/csi/package-info.java} | 23 +--
.../src/main/proto/YarnCsiAdaptor.proto | 6 +
.../src/main/proto/yarn_csi_adaptor.proto | 24 +++
.../service/api/records/ResourceInformation.java | 17 ++
.../hadoop/yarn/service/component/Component.java | 1 +
.../yarn/service/conf/TestAppJsonResolve.java | 1 +
.../yarn/service/conf/examples/external3.json | 1 +
.../pb/client/CsiAdaptorProtocolPBClientImpl.java | 36 ++++
.../service/CsiAdaptorProtocolPBServiceImpl.java | 36 ++++
.../impl/pb/NodePublishVolumeRequestPBImpl.java | 201 ++++++++++++++++++++
.../impl/pb/NodePublishVolumeResponsePBImpl.java | 62 +++++++
.../impl/pb/NodeUnpublishVolumeRequestPBImpl.java | 89 +++++++++
.../impl/pb/NodeUnpublishVolumeResponsePBImpl.java | 61 ++++++
.../src/main/resources/yarn-default.xml | 15 ++
.../hadoop-yarn/hadoop-yarn-csi/pom.xml | 18 ++
.../csi/adaptor/CsiAdaptorProtocolService.java | 76 +++++++-
.../apache/hadoop/yarn/csi/client/CsiClient.java | 6 +
.../hadoop/yarn/csi/client/CsiClientImpl.java | 20 ++
.../GetPluginInfoResponseProtoTranslator.java | 2 +-
.../NodePublishVolumeRequestProtoTranslator.java | 77 ++++++++
...NodeUnpublishVolumeRequestProtoTranslator.java} | 33 ++--
.../csi/translator/ProtoTranslatorFactory.java | 12 ++
.../yarn/csi/adaptor/TestCsiAdaptorService.java | 24 ++-
.../csi/adaptor/TestNodePublishVolumeRequest.java | 55 ++++++
.../hadoop/yarn/csi/client/ICsiClientTest.java} | 40 ++--
.../containermanager/container/Container.java | 4 +
.../containermanager/container/ContainerImpl.java | 11 ++
.../containermanager/launcher/ContainerLaunch.java | 32 ++++
.../linux/runtime/DockerLinuxContainerRuntime.java | 64 +++++++
.../localizer/ContainerLocalizer.java | 1 +
.../volume/csi/ContainerVolumePublisher.java | 205 +++++++++++++++++++++
.../containermanager/volume/csi/package-info.java} | 22 +--
.../executor/ContainerStartContext.java | 12 ++
.../server/nodemanager/webapp/MockContainer.java | 10 +
.../volume/csi/lifecycle/VolumeImpl.java | 2 +
43 files changed, 1441 insertions(+), 125 deletions(-)
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/CsiAdaptorProtocol.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/CsiAdaptorProtocol.java
index 0822163..4e064eb 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/CsiAdaptorProtocol.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/CsiAdaptorProtocol.java
@@ -19,6 +19,10 @@ package org.apache.hadoop.yarn.api;
import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeResponse;
import
org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
import
org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -30,10 +34,50 @@ import java.io.IOException;
*/
public interface CsiAdaptorProtocol {
+ /**
+ * Get plugin info from the CSI driver. The driver usually returns
+ * the name of the driver and its version.
+ * @param request get plugin info request.
+ * @return response that contains driver name and its version.
+ * @throws YarnException
+ * @throws IOException
+ */
GetPluginInfoResponse getPluginInfo(GetPluginInfoRequest request)
throws YarnException, IOException;
+ /**
+ * Validate if the volume capacity can be satisfied on the underneath
+ * storage system. This method responses if the capacity can be satisfied
+ * or not, with a detailed message.
+ * @param request validate volume capability request.
+ * @return validation response.
+ * @throws YarnException
+ * @throws IOException
+ */
ValidateVolumeCapabilitiesResponse validateVolumeCapacity(
ValidateVolumeCapabilitiesRequest request) throws YarnException,
IOException;
+
+ /**
+ * Publish the volume on a node manager, the volume will be mounted
+ * to the local file system and become visible for clients.
+ * @param request publish volume request.
+ * @return publish volume response.
+ * @throws YarnException
+ * @throws IOException
+ */
+ NodePublishVolumeResponse nodePublishVolume(
+ NodePublishVolumeRequest request) throws YarnException, IOException;
+
+ /**
+ * This is a reverse operation of
+ * {@link #nodePublishVolume(NodePublishVolumeRequest)}, it un-mounts the
+ * volume from given node.
+ * @param request un-publish volume request.
+ * @return un-publish volume response.
+ * @throws YarnException
+ * @throws IOException
+ */
+ NodeUnpublishVolumeResponse nodeUnpublishVolume(
+ NodeUnpublishVolumeRequest request) throws YarnException, IOException;
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/NodePublishVolumeRequest.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/NodePublishVolumeRequest.java
new file mode 100644
index 0000000..43c605f
--- /dev/null
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/NodePublishVolumeRequest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.protocolrecords;
+
+import com.google.gson.JsonObject;
+import
org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest.VolumeCapability;
+import org.apache.hadoop.yarn.util.Records;
+
+import java.util.Map;
+
+/**
+ * The request sent by node manager to CSI driver adaptor
+ * to publish a volume on a node.
+ */
+public abstract class NodePublishVolumeRequest {
+
+ public static NodePublishVolumeRequest newInstance(String volumeId,
+ boolean readOnly, String targetPath, String stagingPath,
+ VolumeCapability capability,
+ Map<String, String> publishContext,
+ Map<String, String> secrets) {
+ NodePublishVolumeRequest request =
+ Records.newRecord(NodePublishVolumeRequest.class);
+ request.setVolumeId(volumeId);
+ request.setReadonly(readOnly);
+ request.setTargetPath(targetPath);
+ request.setStagingPath(stagingPath);
+ request.setVolumeCapability(capability);
+ request.setPublishContext(publishContext);
+ request.setSecrets(secrets);
+ return request;
+ }
+
+ public abstract void setVolumeId(String volumeId);
+
+ public abstract String getVolumeId();
+
+ public abstract void setReadonly(boolean readonly);
+
+ public abstract boolean getReadOnly();
+
+ public abstract void setTargetPath(String targetPath);
+
+ public abstract String getTargetPath();
+
+ public abstract void setStagingPath(String stagingPath);
+
+ public abstract String getStagingPath();
+
+ public abstract void setVolumeCapability(VolumeCapability capability);
+
+ public abstract VolumeCapability getVolumeCapability();
+
+ public abstract void setPublishContext(Map<String, String> publishContext);
+
+ public abstract Map<String, String> getPublishContext();
+
+ public abstract void setSecrets(Map<String, String> secrets);
+
+ public abstract Map<String, String> getSecrets();
+
+ public String toString() {
+ JsonObject jsonObject = new JsonObject();
+ jsonObject.addProperty("VolumeId", getVolumeId());
+ jsonObject.addProperty("ReadOnly", getReadOnly());
+ jsonObject.addProperty("TargetPath", getTargetPath());
+ jsonObject.addProperty("StagingPath", getStagingPath());
+ if (getVolumeCapability() != null) {
+ JsonObject jsonCap = new JsonObject();
+ jsonCap.addProperty("AccessMode",
+ getVolumeCapability().getAccessMode().name());
+ jsonCap.addProperty("VolumeType",
+ getVolumeCapability().getVolumeType().name());
+ jsonObject.addProperty("VolumeCapability",
+ jsonCap.toString());
+ }
+ return jsonObject.toString();
+ }
+}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/YarnCsiAdaptor.proto
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/NodePublishVolumeResponse.java
similarity index 62%
copy from
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/YarnCsiAdaptor.proto
copy to
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/NodePublishVolumeResponse.java
index 9dcb8a7..c2377aa 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/YarnCsiAdaptor.proto
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/NodePublishVolumeResponse.java
@@ -15,20 +15,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.hadoop.yarn.api.protocolrecords;
-option java_package = "org.apache.hadoop.yarn.proto";
-option java_outer_classname = "CsiAdaptorProtocol";
-option java_generic_services = true;
-option java_generate_equals_and_hash = true;
-package hadoop.yarn;
+import org.apache.hadoop.yarn.util.Records;
-import "yarn_csi_adaptor.proto";
-
-service CsiAdaptorProtocolService {
-
- rpc getPluginInfo (GetPluginInfoRequest)
- returns (GetPluginInfoResponse);
+/**
+ * The response sent by a CSI driver adaptor to the node manager
+ * after publishing a volume on the node.
+ */
+public abstract class NodePublishVolumeResponse {
- rpc validateVolumeCapacity (ValidateVolumeCapabilitiesRequest)
- returns (ValidateVolumeCapabilitiesResponse);
+ public static NodePublishVolumeResponse newInstance() {
+ return Records.newRecord(NodePublishVolumeResponse.class);
+ }
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/YarnCsiAdaptor.proto
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/NodeUnpublishVolumeRequest.java
similarity index 52%
copy from
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/YarnCsiAdaptor.proto
copy to
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/NodeUnpublishVolumeRequest.java
index 9dcb8a7..48c16e2 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/YarnCsiAdaptor.proto
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/NodeUnpublishVolumeRequest.java
@@ -15,20 +15,30 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.hadoop.yarn.api.protocolrecords;
-option java_package = "org.apache.hadoop.yarn.proto";
-option java_outer_classname = "CsiAdaptorProtocol";
-option java_generic_services = true;
-option java_generate_equals_and_hash = true;
-package hadoop.yarn;
+import org.apache.hadoop.yarn.util.Records;
-import "yarn_csi_adaptor.proto";
+/**
+ * The request sent by node manager to CSI driver adaptor
+ * to un-publish a volume on a node.
+ */
+public abstract class NodeUnpublishVolumeRequest {
+
+ public static NodeUnpublishVolumeRequest newInstance(String volumeId,
+ String targetPath) {
+ NodeUnpublishVolumeRequest request =
+ Records.newRecord(NodeUnpublishVolumeRequest.class);
+ request.setVolumeId(volumeId);
+ request.setTargetPath(targetPath);
+ return request;
+ }
+
+ public abstract void setVolumeId(String volumeId);
-service CsiAdaptorProtocolService {
+ public abstract void setTargetPath(String targetPath);
- rpc getPluginInfo (GetPluginInfoRequest)
- returns (GetPluginInfoResponse);
+ public abstract String getVolumeId();
- rpc validateVolumeCapacity (ValidateVolumeCapabilitiesRequest)
- returns (ValidateVolumeCapabilitiesResponse);
+ public abstract String getTargetPath();
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/YarnCsiAdaptor.proto
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/NodeUnpublishVolumeResponse.java
similarity index 63%
copy from
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/YarnCsiAdaptor.proto
copy to
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/NodeUnpublishVolumeResponse.java
index 9dcb8a7..1b339c0 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/YarnCsiAdaptor.proto
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/NodeUnpublishVolumeResponse.java
@@ -15,20 +15,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.hadoop.yarn.api.protocolrecords;
-option java_package = "org.apache.hadoop.yarn.proto";
-option java_outer_classname = "CsiAdaptorProtocol";
-option java_generic_services = true;
-option java_generate_equals_and_hash = true;
-package hadoop.yarn;
+import org.apache.hadoop.yarn.util.Records;
-import "yarn_csi_adaptor.proto";
-
-service CsiAdaptorProtocolService {
-
- rpc getPluginInfo (GetPluginInfoRequest)
- returns (GetPluginInfoResponse);
+/**
+ * The response sent by a CSI driver adaptor to the node manager
+ * after un-publishing a volume on the node.
+ */
+public class NodeUnpublishVolumeResponse {
- rpc validateVolumeCapacity (ValidateVolumeCapabilitiesRequest)
- returns (ValidateVolumeCapabilitiesResponse);
+ public static NodeUnpublishVolumeResponse newInstance() {
+ return Records.newRecord(NodeUnpublishVolumeResponse.class);
+ }
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java
index 047c09a..4209ca7 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java
@@ -276,10 +276,10 @@ public class ResourceInformation implements
Comparable<ResourceInformation> {
}
public static ResourceInformation newInstance(String name, String units,
- long value, Map<String, String> attributes) {
+ long value, Set<String> tags, Map<String, String> attributes) {
return ResourceInformation
.newInstance(name, units, value, ResourceTypes.COUNTABLE, 0L,
- Long.MAX_VALUE, null, attributes);
+ Long.MAX_VALUE, tags, attributes);
}
public static ResourceInformation newInstance(String name, String units,
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 3bc9957..511edef 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -3443,13 +3443,28 @@ public class YarnConfiguration extends Configuration {
// CSI Volume configs
////////////////////////////////
/**
- * One or more socket addresses for csi-adaptor.
- * Multiple addresses are delimited by ",".
+ * TERMS:
+ * csi-driver: a 3rd party CSI driver which implements the CSI protocol.
+ * It is provided by the storage system.
+ * csi-driver-adaptor: this is an internal RPC service working
+ * as a bridge between YARN and a csi-driver.
*/
public static final String NM_CSI_ADAPTOR_PREFIX =
NM_PREFIX + "csi-driver-adaptor.";
+ public static final String NM_CSI_DRIVER_PREFIX =
+ NM_PREFIX + "csi-driver.";
+ public static final String NM_CSI_DRIVER_ENDPOINT_SUFFIX =
+ ".endpoint";
+ public static final String NM_CSI_ADAPTOR_ADDRESS_SUFFIX =
+ ".address";
+ /**
+ * One or more socket addresses for csi-adaptor.
+ * Multiple addresses are delimited by ",".
+ */
public static final String NM_CSI_ADAPTOR_ADDRESSES =
NM_CSI_ADAPTOR_PREFIX + "addresses";
+ public static final String NM_CSI_DRIVER_NAMES =
+ NM_CSI_DRIVER_PREFIX + "names";
////////////////////////////////
// Other Configs
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/utils/ConfigUtils.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/csi/CsiConfigUtils.java
similarity index 67%
rename from
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/utils/ConfigUtils.java
rename to
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/csi/CsiConfigUtils.java
index 77e6955..e117705 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/utils/ConfigUtils.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/csi/CsiConfigUtils.java
@@ -15,8 +15,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.yarn.csi.utils;
+package org.apache.hadoop.yarn.util.csi;
+import com.google.common.base.Strings;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -24,13 +25,30 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import java.net.InetSocketAddress;
/**
- * Utility class to load configurations.
+ * Utility class for CSI in the API level.
*/
-public final class ConfigUtils {
+public final class CsiConfigUtils {
- private ConfigUtils() {
+ private CsiConfigUtils() {
// Hide constructor for utility class.
}
+
+ public static String[] getCsiDriverNames(Configuration conf) {
+ return conf.getStrings(YarnConfiguration.NM_CSI_DRIVER_NAMES);
+ }
+
+ public static String getCsiDriverEndpoint(String driverName,
+ Configuration conf) throws YarnException {
+ String driverEndpointProperty = YarnConfiguration.NM_CSI_DRIVER_PREFIX
+ + driverName + YarnConfiguration.NM_CSI_DRIVER_ENDPOINT_SUFFIX;
+ String driverEndpoint = conf.get(driverEndpointProperty);
+ if (Strings.isNullOrEmpty(driverEndpoint)) {
+ throw new YarnException("CSI driver's endpoint is not specified or"
+ + " invalid, property "+ driverEndpointProperty + " is not defined");
+ }
+ return driverEndpoint;
+ }
+
/**
* Resolve the CSI adaptor address for a CSI driver from configuration.
* Expected configuration property name is
@@ -43,7 +61,7 @@ public final class ConfigUtils {
public static InetSocketAddress getCsiAdaptorAddressForDriver(
String driverName, Configuration conf) throws YarnException {
String configName = YarnConfiguration.NM_CSI_ADAPTOR_PREFIX
- + driverName + ".address";
+ + driverName + YarnConfiguration.NM_CSI_ADAPTOR_ADDRESS_SUFFIX;
String errorMessage = "Failed to load CSI adaptor address for driver "
+ driverName + ", configuration property " + configName
+ " is not defined or invalid.";
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/YarnCsiAdaptor.proto
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/csi/package-info.java
similarity index 62%
copy from
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/YarnCsiAdaptor.proto
copy to
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/csi/package-info.java
index 9dcb8a7..18b7057 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/YarnCsiAdaptor.proto
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/csi/package-info.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,20 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-option java_package = "org.apache.hadoop.yarn.proto";
-option java_outer_classname = "CsiAdaptorProtocol";
-option java_generic_services = true;
-option java_generate_equals_and_hash = true;
-package hadoop.yarn;
-
-import "yarn_csi_adaptor.proto";
-
-service CsiAdaptorProtocolService {
-
- rpc getPluginInfo (GetPluginInfoRequest)
- returns (GetPluginInfoResponse);
-
- rpc validateVolumeCapacity (ValidateVolumeCapabilitiesRequest)
- returns (ValidateVolumeCapabilitiesResponse);
-}
+/**
+ * Package that includes some CSI utility classes.
+ */
+package org.apache.hadoop.yarn.util.csi;
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/YarnCsiAdaptor.proto
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/YarnCsiAdaptor.proto
index 9dcb8a7..146f5bf 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/YarnCsiAdaptor.proto
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/YarnCsiAdaptor.proto
@@ -31,4 +31,10 @@ service CsiAdaptorProtocolService {
rpc validateVolumeCapacity (ValidateVolumeCapabilitiesRequest)
returns (ValidateVolumeCapabilitiesResponse);
+
+ rpc nodePublishVolume (NodePublishVolumeRequest)
+ returns (NodePublishVolumeResponse);
+
+ rpc nodeUnpublishVolume (NodeUnpublishVolumeRequest)
+ returns (NodeUnpublishVolumeResponse);
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_csi_adaptor.proto
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_csi_adaptor.proto
index c9adbea..9b645e1 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_csi_adaptor.proto
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_csi_adaptor.proto
@@ -66,4 +66,28 @@ message GetPluginInfoRequest {
message GetPluginInfoResponse {
required string name = 1;
required string vendor_version = 2;
+}
+
+message NodePublishVolumeRequest {
+ required string volume_id = 1;
+ repeated StringStringMapProto publish_context = 2;
+ optional string staging_target_path = 3;
+ required string target_path = 4;
+ required VolumeCapability volume_capability = 5;
+ required bool readonly = 6;
+ repeated StringStringMapProto secrets = 7;
+ repeated StringStringMapProto volume_context = 8;
+}
+
+message NodePublishVolumeResponse {
+ // Intentionally empty.
+}
+
+message NodeUnpublishVolumeRequest {
+ required string volume_id = 1;
+ required string target_path = 2;
+}
+
+message NodeUnpublishVolumeResponse {
+ // Intentionally empty.
}
\ No newline at end of file
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ResourceInformation.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ResourceInformation.java
index e466ce7..f868176 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ResourceInformation.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ResourceInformation.java
@@ -19,12 +19,14 @@ package org.apache.hadoop.yarn.service.api.records;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import com.google.gson.annotations.SerializedName;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
/**
* ResourceInformation determines unit/name/value of resource types in
addition to memory and vcores. It will be part of Resource object
@@ -40,11 +42,25 @@ public class ResourceInformation {
@SerializedName("attributes")
private Map<String, String> attributes = null;
+ @SerializedName("tags")
+ private Set<String> tags = null;
+
public ResourceInformation value(Long value) {
this.value = value;
return this;
}
+ public ResourceInformation tags(Set<String> resourceTags) {
+ this.tags = resourceTags;
+ return this;
+ }
+
+ @ApiModelProperty(value = "")
+ @JsonProperty("tags")
+ public Set<String> getTags() {
+ return tags == null ? ImmutableSet.of() : tags;
+ }
+
@ApiModelProperty(value = "")
@JsonProperty("attributes")
public Map<String, String> getAttributes() {
@@ -116,6 +132,7 @@ public class ResourceInformation {
sb.append(" unit: ").append(toIndentedString(unit)).append("\n");
sb.append(" attributes: ").append(toIndentedString(attributes))
.append("\n");
+ sb.append(" tags: ").append(toIndentedString(tags)).append("\n");
sb.append("}");
return sb.toString();
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
index 9895fba..f885b25 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
@@ -755,6 +755,7 @@ public class Component implements
EventHandler<ComponentEvent> {
entry.getKey(),
specInfo.getUnit(),
specInfo.getValue(),
+ specInfo.getTags(),
specInfo.getAttributes());
resource.setResourceInformation(resourceName, ri);
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/conf/TestAppJsonResolve.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/conf/TestAppJsonResolve.java
index 25c502f..04c84dc 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/conf/TestAppJsonResolve.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/conf/TestAppJsonResolve.java
@@ -231,5 +231,6 @@ public class TestAppJsonResolve extends Assert {
Assert.assertEquals("yarn.io/csi-volume", volume.getKey());
Assert.assertEquals(100L, volume.getValue().getValue().longValue());
Assert.assertEquals(2, volume.getValue().getAttributes().size());
+ Assert.assertEquals(1, volume.getValue().getTags().size());
}
}
\ No newline at end of file
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/resources/org/apache/hadoop/yarn/service/conf/examples/external3.json
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/resources/org/apache/hadoop/yarn/service/conf/examples/external3.json
index 74569bd..ef8e323 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/resources/org/apache/hadoop/yarn/service/conf/examples/external3.json
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/resources/org/apache/hadoop/yarn/service/conf/examples/external3.json
@@ -14,6 +14,7 @@
"yarn.io/csi-volume": {
"value": 100,
"unit": "Gi",
+ "tags": ["sample-tag"],
"attributes" : {
"driver" : "hostpath",
"mountPath" : "/mnt/data"
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/CsiAdaptorProtocolPBClientImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/CsiAdaptorProtocolPBClientImpl.java
index 2e10f72..a43d087 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/CsiAdaptorProtocolPBClientImpl.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/CsiAdaptorProtocolPBClientImpl.java
@@ -25,10 +25,18 @@ import org.apache.hadoop.yarn.api.CsiAdaptorPB;
import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeResponse;
import
org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
import
org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
import
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetPluginInfoRequestPBImpl;
import
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetPluginInfoResponsePBImpl;
+import
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.NodePublishVolumeRequestPBImpl;
+import
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.NodePublishVolumeResponsePBImpl;
+import
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.NodeUnpublishVolumeRequestPBImpl;
+import
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.NodeUnpublishVolumeResponsePBImpl;
import
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesRequestPBImpl;
import
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesResponsePBImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -83,6 +91,34 @@ public class CsiAdaptorProtocolPBClientImpl
}
@Override
+ public NodePublishVolumeResponse nodePublishVolume(
+ NodePublishVolumeRequest request) throws IOException, YarnException {
+ CsiAdaptorProtos.NodePublishVolumeRequest requestProto =
+ ((NodePublishVolumeRequestPBImpl) request).getProto();
+ try {
+ return new NodePublishVolumeResponsePBImpl(
+ proxy.nodePublishVolume(null, requestProto));
+ } catch (ServiceException e) {
+ RPCUtil.unwrapAndThrowException(e);
+ return null;
+ }
+ }
+
+ @Override
+ public NodeUnpublishVolumeResponse nodeUnpublishVolume(
+ NodeUnpublishVolumeRequest request) throws YarnException, IOException {
+ CsiAdaptorProtos.NodeUnpublishVolumeRequest requestProto =
+ ((NodeUnpublishVolumeRequestPBImpl) request).getProto();
+ try {
+ return new NodeUnpublishVolumeResponsePBImpl(
+ proxy.nodeUnpublishVolume(null, requestProto));
+ } catch (ServiceException e) {
+ RPCUtil.unwrapAndThrowException(e);
+ return null;
+ }
+ }
+
+ @Override
public void close() throws IOException {
if(this.proxy != null) {
RPC.stopProxy(this.proxy);
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/CsiAdaptorProtocolPBServiceImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/CsiAdaptorProtocolPBServiceImpl.java
index 9a19435..624ad37 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/CsiAdaptorProtocolPBServiceImpl.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/CsiAdaptorProtocolPBServiceImpl.java
@@ -23,9 +23,15 @@ import org.apache.hadoop.yarn.api.CsiAdaptorPB;
import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeResponse;
import
org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
import
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetPluginInfoRequestPBImpl;
import
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetPluginInfoResponsePBImpl;
+import
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.NodePublishVolumeRequestPBImpl;
+import
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.NodePublishVolumeResponsePBImpl;
+import
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.NodeUnpublishVolumeRequestPBImpl;
+import
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.NodeUnpublishVolumeResponsePBImpl;
import
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesRequestPBImpl;
import
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesResponsePBImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -72,4 +78,34 @@ public class CsiAdaptorProtocolPBServiceImpl implements
CsiAdaptorPB {
throw new ServiceException(e);
}
}
+
+ @Override
+ public CsiAdaptorProtos.NodePublishVolumeResponse nodePublishVolume(
+ RpcController controller,
+ CsiAdaptorProtos.NodePublishVolumeRequest request)
+ throws ServiceException {
+ try {
+ NodePublishVolumeRequestPBImpl req =
+ new NodePublishVolumeRequestPBImpl(request);
+ NodePublishVolumeResponse response = real.nodePublishVolume(req);
+ return ((NodePublishVolumeResponsePBImpl) response).getProto();
+ } catch (YarnException | IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
+ public CsiAdaptorProtos.NodeUnpublishVolumeResponse nodeUnpublishVolume(
+ RpcController controller,
+ CsiAdaptorProtos.NodeUnpublishVolumeRequest request)
+ throws ServiceException {
+ try {
+ NodeUnpublishVolumeRequestPBImpl req =
+ new NodeUnpublishVolumeRequestPBImpl(request);
+ NodeUnpublishVolumeResponse response = real.nodeUnpublishVolume(req);
+ return ((NodeUnpublishVolumeResponsePBImpl) response).getProto();
+ } catch (YarnException | IOException e) {
+ throw new ServiceException(e);
+ }
+ }
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/NodePublishVolumeRequestPBImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/NodePublishVolumeRequestPBImpl.java
new file mode 100644
index 0000000..c359023
--- /dev/null
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/NodePublishVolumeRequestPBImpl.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.protobuf.TextFormat;
+import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeRequest;
+import
org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
+import
org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest.VolumeCapability;
+import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
+import org.apache.hadoop.yarn.proto.CsiAdaptorProtos;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+
+import java.util.Map;
+
+/**
+ * Request to publish volume on node manager.
+ */
+public class NodePublishVolumeRequestPBImpl extends
+ NodePublishVolumeRequest {
+
+ private CsiAdaptorProtos.NodePublishVolumeRequest.Builder builder;
+
+ public NodePublishVolumeRequestPBImpl() {
+ this.builder = CsiAdaptorProtos.NodePublishVolumeRequest.newBuilder();
+ }
+
+ public NodePublishVolumeRequestPBImpl(
+ CsiAdaptorProtos.NodePublishVolumeRequest request) {
+ this.builder = request.toBuilder();
+ }
+
+ public CsiAdaptorProtos.NodePublishVolumeRequest getProto() {
+ Preconditions.checkNotNull(builder);
+ return builder.build();
+ }
+
+ @Override
+ public void setVolumeId(String volumeId) {
+ Preconditions.checkNotNull(builder);
+ builder.setVolumeId(volumeId);
+ }
+
+ @Override
+ public String getVolumeId() {
+ Preconditions.checkNotNull(builder);
+ return builder.getVolumeId();
+ }
+
+ @Override
+ public void setReadonly(boolean readonly) {
+ Preconditions.checkNotNull(builder);
+ builder.setReadonly(readonly);
+ }
+
+ @Override
+ public boolean getReadOnly() {
+ Preconditions.checkNotNull(builder);
+ return builder.getReadonly();
+ }
+
+ @Override
+ public void setSecrets(Map<String, String> secrets) {
+ if (secrets != null) {
+ Preconditions.checkNotNull(builder);
+ for(Map.Entry<String, String> entry : secrets.entrySet()) {
+ YarnProtos.StringStringMapProto mapEntry =
+ YarnProtos.StringStringMapProto.newBuilder()
+ .setKey(entry.getKey())
+ .setValue(entry.getValue())
+ .build();
+ builder.addSecrets(mapEntry);
+ }
+ }
+ }
+
+ @Override
+ public Map<String, String> getSecrets() {
+ Preconditions.checkNotNull(builder);
+ return builder.getSecretsCount() > 0 ?
+ ProtoUtils.convertStringStringMapProtoListToMap(
+ builder.getSecretsList()) : ImmutableMap.of();
+ }
+
+ @Override
+ public String getTargetPath() {
+ Preconditions.checkNotNull(builder);
+ return builder.getTargetPath();
+ }
+
+ @Override
+ public void setStagingPath(String stagingPath) {
+ Preconditions.checkNotNull(builder);
+ builder.setStagingTargetPath(stagingPath);
+ }
+
+ @Override
+ public String getStagingPath() {
+ Preconditions.checkNotNull(builder);
+ return builder.getStagingTargetPath();
+ }
+
+ @Override
+ public void setPublishContext(Map<String, String> publishContext) {
+ if (publishContext != null) {
+ Preconditions.checkNotNull(builder);
+ for(Map.Entry<String, String> entry : publishContext.entrySet()) {
+ YarnProtos.StringStringMapProto mapEntry =
+ YarnProtos.StringStringMapProto.newBuilder()
+ .setKey(entry.getKey())
+ .setValue(entry.getValue())
+ .build();
+ builder.addPublishContext(mapEntry);
+ }
+ }
+ }
+
+ @Override
+ public Map<String, String> getPublishContext() {
+ Preconditions.checkNotNull(builder);
+ return builder.getPublishContextCount() > 0 ?
+ ProtoUtils.convertStringStringMapProtoListToMap(
+ builder.getPublishContextList()) : ImmutableMap.of();
+ }
+
+ @Override
+ public void setTargetPath(String targetPath) {
+ if (targetPath != null) {
+ Preconditions.checkNotNull(builder);
+ builder.setTargetPath(targetPath);
+ }
+ }
+
+ @Override
+ public void setVolumeCapability(
+ VolumeCapability capability) {
+ if (capability != null) {
+ CsiAdaptorProtos.VolumeCapability vc =
+ CsiAdaptorProtos.VolumeCapability.newBuilder()
+ .setAccessMode(CsiAdaptorProtos.VolumeCapability
+ .AccessMode.valueOf(
+ capability.getAccessMode().ordinal()))
+ .setVolumeType(CsiAdaptorProtos.VolumeCapability
+ .VolumeType.valueOf(capability.getVolumeType().ordinal()))
+ .addAllMountFlags(capability.getMountFlags())
+ .build();
+ builder.setVolumeCapability(vc);
+ }
+ }
+
+ @Override
+ public VolumeCapability getVolumeCapability() {
+ CsiAdaptorProtos.VolumeCapability cap0 = builder.getVolumeCapability();
+ if (builder.hasVolumeCapability()) {
+ return new VolumeCapability(
+ ValidateVolumeCapabilitiesRequest.AccessMode
+ .valueOf(cap0.getAccessMode().name()),
+ ValidateVolumeCapabilitiesRequest.VolumeType
+ .valueOf(cap0.getVolumeType().name()),
+ cap0.getMountFlagsList());
+ }
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return TextFormat.shortDebugString(getProto());
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null) {
+ return false;
+ }
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/NodePublishVolumeResponsePBImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/NodePublishVolumeResponsePBImpl.java
new file mode 100644
index 0000000..cbdf91f
--- /dev/null
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/NodePublishVolumeResponsePBImpl.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeResponse;
+import org.apache.hadoop.yarn.proto.CsiAdaptorProtos;
+
+/**
+ * Protobuf record class for node publish response.
+ */
+public class NodePublishVolumeResponsePBImpl
+ extends NodePublishVolumeResponse {
+
+ private CsiAdaptorProtos.NodePublishVolumeResponse.Builder builder;
+
+ public NodePublishVolumeResponsePBImpl(
+ CsiAdaptorProtos.NodePublishVolumeResponse proto) {
+ this.builder = proto.toBuilder();
+ }
+
+ public NodePublishVolumeResponsePBImpl() {
+ this.builder = CsiAdaptorProtos.NodePublishVolumeResponse
+ .newBuilder();
+ }
+
+ public CsiAdaptorProtos.NodePublishVolumeResponse getProto() {
+ Preconditions.checkNotNull(builder);
+ return builder.build();
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null) {
+ return false;
+ }
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/NodeUnpublishVolumeRequestPBImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/NodeUnpublishVolumeRequestPBImpl.java
new file mode 100644
index 0000000..455b1f7
--- /dev/null
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/NodeUnpublishVolumeRequestPBImpl.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.TextFormat;
+import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeRequest;
+import org.apache.hadoop.yarn.proto.CsiAdaptorProtos;
+
+/**
+ * The protobuf record class for request to un-publish volume on node manager.
+ */
+public class NodeUnpublishVolumeRequestPBImpl extends
+ NodeUnpublishVolumeRequest {
+
+ private CsiAdaptorProtos.NodeUnpublishVolumeRequest.Builder builder;
+
+ public NodeUnpublishVolumeRequestPBImpl() {
+ this.builder = CsiAdaptorProtos.NodeUnpublishVolumeRequest.newBuilder();
+ }
+
+ public NodeUnpublishVolumeRequestPBImpl(
+ CsiAdaptorProtos.NodeUnpublishVolumeRequest request) {
+ this.builder = request.toBuilder();
+ }
+
+ public CsiAdaptorProtos.NodeUnpublishVolumeRequest getProto() {
+ Preconditions.checkNotNull(builder);
+ return builder.build();
+ }
+
+ @Override
+ public void setVolumeId(String volumeId) {
+ Preconditions.checkNotNull(builder);
+ this.builder.setVolumeId(volumeId);
+ }
+
+ @Override
+ public void setTargetPath(String targetPath) {
+ Preconditions.checkNotNull(builder);
+ this.builder.setTargetPath(targetPath);
+ }
+
+ @Override
+ public String getVolumeId() {
+ return builder.getVolumeId();
+ }
+
+ @Override
+ public String getTargetPath() {
+ return builder.getTargetPath();
+ }
+
+ @Override
+ public String toString() {
+ return TextFormat.shortDebugString(getProto());
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null) {
+ return false;
+ }
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/NodeUnpublishVolumeResponsePBImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/NodeUnpublishVolumeResponsePBImpl.java
new file mode 100644
index 0000000..8406e41
--- /dev/null
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/NodeUnpublishVolumeResponsePBImpl.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeResponse;
+import org.apache.hadoop.yarn.proto.CsiAdaptorProtos;
+
+/**
+ * Response to the un-publish volume request on node manager.
+ */
+public class NodeUnpublishVolumeResponsePBImpl extends
+ NodeUnpublishVolumeResponse {
+
+ private CsiAdaptorProtos.NodeUnpublishVolumeResponse.Builder builder;
+
+ public NodeUnpublishVolumeResponsePBImpl() {
+ this.builder = CsiAdaptorProtos.NodeUnpublishVolumeResponse.newBuilder();
+ }
+
+ public NodeUnpublishVolumeResponsePBImpl(
+ CsiAdaptorProtos.NodeUnpublishVolumeResponse response) {
+ this.builder = response.toBuilder();
+ }
+
+ public CsiAdaptorProtos.NodeUnpublishVolumeResponse getProto() {
+ Preconditions.checkNotNull(builder);
+ return builder.build();
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null) {
+ return false;
+ }
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index ff64c90..a0e0eda 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -4083,4 +4083,19 @@
<name>yarn.nodemanager.csi-driver-adaptor.addresses</name>
<value></value>
</property>
+
+ <property>
+ <description>
+ CSI driver names running on this node, multiple driver names need to
+ be delimited by comma. The driver name should be same value returned
+ by the getPluginInfo call. For each of the CSI driver name, it must
+ to define following two corresponding properties:
+ "yarn.nodemanager.csi-driver.${NAME}.endpoint"
+ "yarn.nodemanager.csi-driver-adaptor.${NAME}.address"
+ The 1st property defines where the driver's endpoint is;
+ 2nd property defines where the mapping csi-driver-adaptor's address is.
+ </description>
+ <name>yarn.nodemanager.csi-driver.names</name>
+ <value></value>
+ </property>
</configuration>
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/pom.xml
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/pom.xml
index 1a19f0e..44c2607 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/pom.xml
@@ -180,6 +180,24 @@
<excludePackageNames>csi.v0</excludePackageNames>
</configuration>
</plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+
<source>${basedir}/target/generated-sources/protobuf/java</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
</project>
\ No newline at end of file
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/CsiAdaptorProtocolService.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/CsiAdaptorProtocolService.java
index f94275f..7020f06 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/CsiAdaptorProtocolService.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/adaptor/CsiAdaptorProtocolService.java
@@ -21,29 +21,36 @@ import com.google.common.annotations.VisibleForTesting;
import csi.v0.Csi;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeResponse;
import
org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
import
org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
import org.apache.hadoop.yarn.csi.client.CsiClient;
import org.apache.hadoop.yarn.csi.client.CsiClientImpl;
import org.apache.hadoop.yarn.csi.translator.ProtoTranslatorFactory;
-import org.apache.hadoop.yarn.csi.utils.ConfigUtils;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
+import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
+import org.apache.hadoop.yarn.server.api.AuxiliaryService;
+import org.apache.hadoop.yarn.util.csi.CsiConfigUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
/**
* This is a Hadoop RPC server, we uses the Hadoop RPC framework here
* because we need to stick to the security model current Hadoop supports.
*/
-public class CsiAdaptorProtocolService extends AbstractService
+public class CsiAdaptorProtocolService extends AuxiliaryService
implements CsiAdaptorProtocol {
private static final Logger LOG =
@@ -54,6 +61,12 @@ public class CsiAdaptorProtocolService extends
AbstractService
private CsiClient csiClient;
private String csiDriverName;
+ public CsiAdaptorProtocolService() {
+ super(CsiAdaptorProtocolService.class.getName());
+ // TODO read this from configuration
+ this.csiDriverName = "ch.ctrox.csi.s3-driver";
+ }
+
public CsiAdaptorProtocolService(String driverName,
String domainSocketPath) {
super(CsiAdaptorProtocolService.class.getName());
@@ -68,7 +81,11 @@ public class CsiAdaptorProtocolService extends
AbstractService
@Override
protected void serviceInit(Configuration conf) throws Exception {
- adaptorServiceAddress = ConfigUtils
+
+ String driverEndpoint = CsiConfigUtils
+ .getCsiDriverEndpoint(csiDriverName, conf);
+ this.csiClient = new CsiClientImpl(driverEndpoint);
+ adaptorServiceAddress = CsiConfigUtils
.getCsiAdaptorAddressForDriver(csiDriverName, conf);
super.serviceInit(conf);
}
@@ -119,4 +136,55 @@ public class CsiAdaptorProtocolService extends
AbstractService
Csi.ValidateVolumeCapabilitiesResponse.class)
.convertFrom(response);
}
+
+ @Override
+ public NodePublishVolumeResponse nodePublishVolume(
+ NodePublishVolumeRequest request) throws YarnException, IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received nodePublishVolume call, request: {}",
+ request.toString());
+ }
+ Csi.NodePublishVolumeRequest req = ProtoTranslatorFactory
+ .getTranslator(NodePublishVolumeRequest.class,
+ Csi.NodePublishVolumeRequest.class).convertTo(request);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Translate to CSI proto message: {}", req.toString());
+ }
+ csiClient.nodePublishVolume(req);
+ return NodePublishVolumeResponse.newInstance();
+ }
+
+ @Override
+ public NodeUnpublishVolumeResponse nodeUnpublishVolume(
+ NodeUnpublishVolumeRequest request) throws YarnException, IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received nodeUnpublishVolume call, request: {}",
+ request.toString());
+ }
+ Csi.NodeUnpublishVolumeRequest req = ProtoTranslatorFactory
+ .getTranslator(NodeUnpublishVolumeRequest.class,
+ Csi.NodeUnpublishVolumeRequest.class).convertTo(request);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Translate to CSI proto message: {}", req.toString());
+ }
+ csiClient.nodeUnpublishVolume(req);
+ return NodeUnpublishVolumeResponse.newInstance();
+ }
+
+ @Override
+ public void initializeApplication(
+ ApplicationInitializationContext initAppContext) {
+ // do nothing
+ }
+
+ @Override
+ public void stopApplication(
+ ApplicationTerminationContext stopAppContext) {
+ // do nothing
+ }
+
+ @Override
+ public ByteBuffer getMetaData() {
+ return ByteBuffer.allocate(0);
+ }
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClient.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClient.java
index d31c0c9..837b667 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClient.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClient.java
@@ -40,4 +40,10 @@ public interface CsiClient {
Csi.ValidateVolumeCapabilitiesResponse validateVolumeCapabilities(
Csi.ValidateVolumeCapabilitiesRequest request) throws IOException;
+
+ Csi.NodePublishVolumeResponse nodePublishVolume(
+ Csi.NodePublishVolumeRequest request) throws IOException;
+
+ Csi.NodeUnpublishVolumeResponse nodeUnpublishVolume(
+ Csi.NodeUnpublishVolumeRequest request) throws IOException;
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClientImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClientImpl.java
index 5b3d2e2..0a107e1 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClientImpl.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClientImpl.java
@@ -59,4 +59,24 @@ public class CsiClientImpl implements CsiClient {
.validateVolumeCapabilities(request);
}
}
+
+ @Override
+ public Csi.NodePublishVolumeResponse nodePublishVolume(
+ Csi.NodePublishVolumeRequest request) throws IOException {
+ try (CsiGrpcClient client = CsiGrpcClient.newBuilder()
+ .setDomainSocketAddress(address).build()) {
+ return client.createNodeBlockingStub()
+ .nodePublishVolume(request);
+ }
+ }
+
+ @Override
+ public Csi.NodeUnpublishVolumeResponse nodeUnpublishVolume(
+ Csi.NodeUnpublishVolumeRequest request) throws IOException {
+ try (CsiGrpcClient client = CsiGrpcClient.newBuilder()
+ .setDomainSocketAddress(address).build()) {
+ return client.createNodeBlockingStub()
+ .nodeUnpublishVolume(request);
+ }
+ }
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/GetPluginInfoResponseProtoTranslator.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/GetPluginInfoResponseProtoTranslator.java
index c4f042e..bcf634a 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/GetPluginInfoResponseProtoTranslator.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/GetPluginInfoResponseProtoTranslator.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
* Protobuf message translator for GetPluginInfoResponse and
* Csi.GetPluginInfoResponse.
*/
-public class GetPluginInfoResponseProtoTranslator implements
+public class GetPluginInfoResponseProtoTranslator<A, B> implements
ProtoTranslator<GetPluginInfoResponse, Csi.GetPluginInfoResponse> {
@Override public Csi.GetPluginInfoResponse convertTo(
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/NodePublishVolumeRequestProtoTranslator.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/NodePublishVolumeRequestProtoTranslator.java
new file mode 100644
index 0000000..e86dd3f
--- /dev/null
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/NodePublishVolumeRequestProtoTranslator.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.csi.translator;
+
+import csi.v0.Csi;
+import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeRequest;
+import
org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * This class helps to transform a YARN side NodePublishVolumeRequest
+ * to corresponding CSI protocol message.
+ * @param <A> YARN NodePublishVolumeRequest
+ * @param <B> CSI NodePublishVolumeRequest
+ */
+public class NodePublishVolumeRequestProtoTranslator<A, B> implements
+ ProtoTranslator<NodePublishVolumeRequest,
+ Csi.NodePublishVolumeRequest> {
+
+ @Override
+ public Csi.NodePublishVolumeRequest convertTo(
+ NodePublishVolumeRequest messageA) throws YarnException {
+ Csi.NodePublishVolumeRequest.Builder builder =
+ Csi.NodePublishVolumeRequest.newBuilder();
+ ValidateVolumeCapabilitiesRequest.VolumeCapability cap =
+ messageA.getVolumeCapability();
+ Csi.VolumeCapability csiVolumeCap = Csi.VolumeCapability.newBuilder()
+ .setAccessMode(Csi.VolumeCapability.AccessMode.newBuilder()
+ .setModeValue(cap.getAccessMode().ordinal())) // access mode
+ // TODO support block
+ .setMount(Csi.VolumeCapability.MountVolume.newBuilder()
+ // TODO support fsType
+ .setFsType("xfs") // fs type
+ .addAllMountFlags(cap.getMountFlags())) // mount flags
+ .build();
+ builder.setVolumeCapability(csiVolumeCap);
+ builder.setVolumeId(messageA.getVolumeId());
+ builder.setTargetPath(messageA.getTargetPath());
+ builder.setReadonly(messageA.getReadOnly());
+ builder.putAllNodePublishSecrets(messageA.getSecrets());
+ builder.putAllPublishInfo(messageA.getPublishContext());
+ builder.setStagingTargetPath(messageA.getStagingPath());
+ return builder.build();
+ }
+
+ @Override
+ public NodePublishVolumeRequest convertFrom(
+ Csi.NodePublishVolumeRequest messageB) throws YarnException {
+ Csi.VolumeCapability cap0 = messageB.getVolumeCapability();
+ ValidateVolumeCapabilitiesRequest.VolumeCapability cap =
+ new ValidateVolumeCapabilitiesRequest.VolumeCapability(
+ ValidateVolumeCapabilitiesRequest.AccessMode
+ .valueOf(cap0.getAccessMode().getMode().name()),
+ ValidateVolumeCapabilitiesRequest.VolumeType.FILE_SYSTEM,
+ cap0.getMount().getMountFlagsList());
+ return NodePublishVolumeRequest.newInstance(
+ messageB.getVolumeId(), messageB.getReadonly(),
+ messageB.getTargetPath(), messageB.getStagingTargetPath(),
+ cap, messageB.getPublishInfoMap(),
+ messageB.getNodePublishSecretsMap());
+ }
+}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/GetPluginInfoResponseProtoTranslator.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/NodeUnpublishVolumeRequestProtoTranslator.java
similarity index 50%
copy from
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/GetPluginInfoResponseProtoTranslator.java
copy to
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/NodeUnpublishVolumeRequestProtoTranslator.java
index c4f042e..485237e 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/GetPluginInfoResponseProtoTranslator.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/NodeUnpublishVolumeRequestProtoTranslator.java
@@ -18,27 +18,32 @@
package org.apache.hadoop.yarn.csi.translator;
import csi.v0.Csi;
-import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
/**
- * Protobuf message translator for GetPluginInfoResponse and
- * Csi.GetPluginInfoResponse.
+ * This class helps to transform a YARN side NodeUnpublishVolumeRequest
+ * to corresponding CSI protocol message.
+ * @param <A> YARN NodeUnpublishVolumeRequest
+ * @param <B> CSI NodeUnpublishVolumeRequest
*/
-public class GetPluginInfoResponseProtoTranslator implements
- ProtoTranslator<GetPluginInfoResponse, Csi.GetPluginInfoResponse> {
+public class NodeUnpublishVolumeRequestProtoTranslator<A, B> implements
+ ProtoTranslator<NodeUnpublishVolumeRequest,
+ Csi.NodeUnpublishVolumeRequest> {
- @Override public Csi.GetPluginInfoResponse convertTo(
- GetPluginInfoResponse messageA) throws YarnException {
- return Csi.GetPluginInfoResponse.newBuilder()
- .setName(messageA.getDriverName())
- .setVendorVersion(messageA.getVersion())
+ @Override
+ public Csi.NodeUnpublishVolumeRequest convertTo(
+ NodeUnpublishVolumeRequest messageA) throws YarnException {
+ return Csi.NodeUnpublishVolumeRequest.newBuilder()
+ .setVolumeId(messageA.getVolumeId())
+ .setTargetPath(messageA.getTargetPath())
.build();
}
- @Override public GetPluginInfoResponse convertFrom(
- Csi.GetPluginInfoResponse messageB) throws YarnException {
- return GetPluginInfoResponse.newInstance(messageB.getName(),
- messageB.getVendorVersion());
+ @Override
+ public NodeUnpublishVolumeRequest convertFrom(
+ Csi.NodeUnpublishVolumeRequest messageB) throws YarnException {
+ return NodeUnpublishVolumeRequest
+ .newInstance(messageB.getVolumeId(), messageB.getTargetPath());
}
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/ProtoTranslatorFactory.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/ProtoTranslatorFactory.java
index 5eb76ff..1a7306f 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/ProtoTranslatorFactory.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/translator/ProtoTranslatorFactory.java
@@ -18,8 +18,11 @@
package org.apache.hadoop.yarn.csi.translator;
import csi.v0.Csi;
+import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeRequest;
import
org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
import
org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetPluginInfoResponse;
/**
* Factory class to get desired proto transformer instance.
@@ -57,6 +60,15 @@ public final class ProtoTranslatorFactory {
} else if (yarnProto == ValidateVolumeCapabilitiesResponse.class
&& csiProto == Csi.ValidateVolumeCapabilitiesResponse.class) {
return new ValidationVolumeCapabilitiesResponseProtoTranslator();
+ } else if (yarnProto == NodePublishVolumeRequest.class
+ && csiProto == Csi.NodePublishVolumeRequest.class) {
+ return new NodePublishVolumeRequestProtoTranslator();
+ } else if (yarnProto == GetPluginInfoResponse.class
+ && csiProto == Csi.GetPluginInfoResponse.class) {
+ return new GetPluginInfoResponseProtoTranslator();
+ } else if (yarnProto == NodeUnpublishVolumeRequest.class
+ && csiProto == Csi.NodeUnpublishVolumeRequest.class) {
+ return new NodeUnpublishVolumeRequestProtoTranslator();
}
throw new IllegalArgumentException("A problem is found while processing"
+ " proto message translating. Unexpected message types,"
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestCsiAdaptorService.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestCsiAdaptorService.java
index 128240d..d6ee231 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestCsiAdaptorService.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestCsiAdaptorService.java
@@ -33,7 +33,7 @@ import
org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResp
import
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ValidateVolumeCapabilitiesRequestPBImpl;
import org.apache.hadoop.yarn.client.NMProxy;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.csi.client.CsiClient;
+import org.apache.hadoop.yarn.csi.client.ICsiClientTest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.junit.AfterClass;
@@ -81,13 +81,18 @@ public class TestCsiAdaptorService {
conf.setSocketAddr(
YarnConfiguration.NM_CSI_ADAPTOR_PREFIX + "test-driver.address",
address);
+ conf.set(
+ YarnConfiguration.NM_CSI_DRIVER_PREFIX + "test-driver.endpoint",
+ "unix:///tmp/test-driver.scok");
CsiAdaptorProtocolService service =
new CsiAdaptorProtocolService("test-driver", domainSocket);
+ service.init(conf);
+ service.start();
// inject a fake CSI client
// this client validates if the ValidateVolumeCapabilitiesRequest
// is integrity, and then reply a fake response
- service.setCsiClient(new CsiClient() {
+ service.setCsiClient(new ICsiClientTest() {
@Override
public Csi.GetPluginInfoResponse getPluginInfo() {
return Csi.GetPluginInfoResponse.newBuilder()
@@ -103,7 +108,7 @@ public class TestCsiAdaptorService {
Assert.assertEquals("volume-id-0000123", request.getVolumeId());
Assert.assertEquals(1, request.getVolumeCapabilitiesCount());
Assert.assertEquals(Csi.VolumeCapability.AccessMode
- .newBuilder().setModeValue(5).build(),
+ .newBuilder().setModeValue(5).build(),
request.getVolumeCapabilities(0).getAccessMode());
Assert.assertTrue(request.getVolumeCapabilities(0).hasMount());
Assert.assertEquals(2, request.getVolumeCapabilities(0)
@@ -123,9 +128,6 @@ public class TestCsiAdaptorService {
}
});
- service.init(conf);
- service.start();
-
try (CsiAdaptorProtocolPBClientImpl client =
new CsiAdaptorProtocolPBClientImpl(1L, address, new Configuration())) {
ValidateVolumeCapabilitiesRequest request =
@@ -157,13 +159,18 @@ public class TestCsiAdaptorService {
conf.setSocketAddr(
YarnConfiguration.NM_CSI_ADAPTOR_PREFIX + "test-driver.address",
address);
+ conf.set(
+ YarnConfiguration.NM_CSI_DRIVER_PREFIX + "test-driver.endpoint",
+ "unix:///tmp/test-driver.scok");
CsiAdaptorProtocolService service =
new CsiAdaptorProtocolService("test-driver", domainSocket);
+ service.init(conf);
+ service.start();
// inject a fake CSI client
// this client validates if the ValidateVolumeCapabilitiesRequest
// is integrity, and then reply a fake response
- service.setCsiClient(new CsiClient() {
+ service.setCsiClient(new ICsiClientTest() {
@Override
public Csi.GetPluginInfoResponse getPluginInfo() {
return Csi.GetPluginInfoResponse.newBuilder()
@@ -199,9 +206,6 @@ public class TestCsiAdaptorService {
}
});
- service.init(conf);
- service.start();
-
YarnRPC rpc = YarnRPC.create(conf);
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
CsiAdaptorProtocol adaptorClient = NMProxy
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestNodePublishVolumeRequest.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestNodePublishVolumeRequest.java
new file mode 100644
index 0000000..5f7fa87
--- /dev/null
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/adaptor/TestNodePublishVolumeRequest.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.csi.adaptor;
+
+import
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.NodePublishVolumeRequestPBImpl;
+import org.apache.hadoop.yarn.proto.CsiAdaptorProtos;
+import
org.apache.hadoop.yarn.proto.CsiAdaptorProtos.VolumeCapability.AccessMode;
+import
org.apache.hadoop.yarn.proto.CsiAdaptorProtos.VolumeCapability.VolumeType;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * UT for NodePublishVolumeRequest.
+ */
+public class TestNodePublishVolumeRequest {
+
+ @Test
+ public void testPBRecord() {
+ CsiAdaptorProtos.VolumeCapability capability =
+ CsiAdaptorProtos.VolumeCapability.newBuilder()
+ .setAccessMode(AccessMode.MULTI_NODE_READER_ONLY)
+ .setVolumeType(VolumeType.FILE_SYSTEM)
+ .build();
+ CsiAdaptorProtos.NodePublishVolumeRequest proto =
+ CsiAdaptorProtos.NodePublishVolumeRequest.newBuilder()
+ .setReadonly(false)
+ .setVolumeId("test-vol-000001")
+ .setTargetPath("/mnt/data")
+ .setStagingTargetPath("/mnt/staging")
+ .setVolumeCapability(capability)
+ .build();
+
+ NodePublishVolumeRequestPBImpl pbImpl =
+ new NodePublishVolumeRequestPBImpl(proto);
+ Assert.assertEquals("test-vol-000001", pbImpl.getVolumeId());
+ Assert.assertEquals("/mnt/data", pbImpl.getTargetPath());
+ Assert.assertEquals("/mnt/staging", pbImpl.getStagingPath());
+ Assert.assertFalse(pbImpl.getReadOnly());
+ }
+}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClient.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/client/ICsiClientTest.java
similarity index 58%
copy from
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClient.java
copy to
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/client/ICsiClientTest.java
index d31c0c9..2f150cb 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/main/java/org/apache/hadoop/yarn/csi/client/CsiClient.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/src/test/java/org/apache/hadoop/yarn/csi/client/ICsiClientTest.java
@@ -15,29 +15,39 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.yarn.csi.client;
import csi.v0.Csi;
-import csi.v0.Csi.GetPluginInfoResponse;
import java.io.IOException;
/**
- * General interface for a CSI client. This interface defines all APIs
- * that CSI spec supports, including both identity/controller/node service
- * APIs.
+ * This interface is used only in testing. It gives default implementation
+ * of all methods.
*/
-public interface CsiClient {
+public interface ICsiClientTest extends CsiClient {
+
+ @Override
+ default Csi.GetPluginInfoResponse getPluginInfo()
+ throws IOException {
+ return null;
+ }
+
+ @Override
+ default Csi.ValidateVolumeCapabilitiesResponse validateVolumeCapabilities(
+ Csi.ValidateVolumeCapabilitiesRequest request) throws IOException {
+ return null;
+ }
- /**
- * Gets some basic info about the CSI plugin, including the driver name,
- * version and optionally some manifest info.
- * @return {@link GetPluginInfoResponse}
- * @throws IOException when unable to get plugin info from the driver.
- */
- GetPluginInfoResponse getPluginInfo() throws IOException;
+ @Override
+ default Csi.NodePublishVolumeResponse nodePublishVolume(
+ Csi.NodePublishVolumeRequest request) throws IOException {
+ return null;
+ }
- Csi.ValidateVolumeCapabilitiesResponse validateVolumeCapabilities(
- Csi.ValidateVolumeCapabilitiesRequest request) throws IOException;
+ @Override
+ default Csi.NodeUnpublishVolumeResponse nodeUnpublishVolume(
+ Csi.NodeUnpublishVolumeRequest request) throws IOException {
+ return null;
+ }
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
index faa695c..0565885 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
@@ -75,6 +75,10 @@ public interface Container extends
EventHandler<ContainerEvent> {
void setWorkDir(String workDir);
+ String getCsiVolumesRootDir();
+
+ void setCsiVolumesRootDir(String volumesRootDir);
+
String getLogDir();
void setLogDir(String logDir);
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index e8f4791..1d6ba2e 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -172,6 +172,7 @@ public class ContainerImpl implements Container {
private SlidingWindowRetryPolicy.RetryContext windowRetryContext;
private SlidingWindowRetryPolicy retryPolicy;
+ private String csiVolumesRootDir;
private String workDir;
private String logDir;
private String host;
@@ -936,6 +937,16 @@ public class ContainerImpl implements Container {
this.workDir = workDir;
}
+ @Override
+ public String getCsiVolumesRootDir() {
+ return csiVolumesRootDir;
+ }
+
+ @Override
+ public void setCsiVolumesRootDir(String volumesRootDir) {
+ this.csiVolumesRootDir = volumesRootDir;
+ }
+
private void clearIpAndHost() {
LOG.info("{} clearing ip and host", containerId);
this.ips = null;
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
index f06040e..9b6fae7 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
@@ -250,6 +250,10 @@ public class ContainerLaunch implements Callable<Integer> {
Path containerWorkDir = deriveContainerWorkDir();
recordContainerWorkDir(containerID, containerWorkDir.toString());
+ // Select a root dir for all csi volumes for the container
+ Path csiVolumesRoot = deriveCsiVolumesRootDir();
+ recordContainerCsiVolumesRootDir(containerID, csiVolumesRoot.toString());
+
String pidFileSubpath = getPidFileSubpath(appIdStr, containerIdStr);
// pid file should be in nm private dir so that it is not
// accessible by users
@@ -358,6 +362,7 @@ public class ContainerLaunch implements Callable<Integer> {
.setUser(user)
.setAppId(appIdStr)
.setContainerWorkDir(containerWorkDir)
+ .setContainerCsiVolumesRootDir(csiVolumesRoot)
.setLocalDirs(localDirs)
.setLogDirs(logDirs)
.setFilecacheDirs(filecacheDirs)
@@ -388,6 +393,27 @@ public class ContainerLaunch implements Callable<Integer> {
return ret;
}
+ /**
+ * Volumes mount point root:
+ * ${YARN_LOCAL_DIR}/usercache/${user}/filecache/csiVolumes/app/container
+ * CSI volumes may creates the mount point with different permission bits.
+ * If we create the volume mount under container work dir, it may
+ * mess up the existing permission structure, which is restricted by
+ * linux container executor. So we put all volume mounts under a same
+ * root dir so it is easier cleanup.
+ **/
+ private Path deriveCsiVolumesRootDir() throws IOException {
+ final String containerVolumePath =
+ ContainerLocalizer.USERCACHE + Path.SEPARATOR
+ + container.getUser() + Path.SEPARATOR
+ + ContainerLocalizer.FILECACHE + Path.SEPARATOR
+ + ContainerLocalizer.CSI_VOLIUME_MOUNTS_ROOT + Path.SEPARATOR
+ + app.getAppId().toString() + Path.SEPARATOR
+ + container.getContainerId().toString();
+ return dirsHandler.getLocalPathForWrite(containerVolumePath,
+ LocalDirAllocator.SIZE_UNKNOWN, false);
+ }
+
private Path deriveContainerWorkDir() throws IOException {
final String containerWorkDirPath =
@@ -1752,6 +1778,12 @@ public class ContainerLaunch implements
Callable<Integer> {
}
}
+ private void recordContainerCsiVolumesRootDir(ContainerId containerId,
+ String volumesRoot) throws IOException {
+ container.setCsiVolumesRootDir(volumesRoot);
+ // TODO persistent to the NM store...
+ }
+
protected Path getContainerWorkDir() throws IOException {
String containerWorkDir = container.getWorkDir();
if (containerWorkDir == null
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java
index 4970c7c..7fc386d 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java
@@ -24,7 +24,10 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
+import
org.apache.hadoop.yarn.api.impl.pb.client.CsiAdaptorProtocolPBClientImpl;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import
org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerCommand;
import
org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerCommandExecutor;
@@ -35,7 +38,9 @@ import
org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.
import
org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.docker.DockerVolumeCommand;
import
org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.DockerCommandPlugin;
import
org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePlugin;
+import
org.apache.hadoop.yarn.server.nodemanager.containermanager.volume.csi.ContainerVolumePublisher;
import org.apache.hadoop.yarn.util.DockerClientConfigHandler;
+import org.apache.hadoop.yarn.util.csi.CsiConfigUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -69,6 +74,7 @@ import
org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
+import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
@@ -76,6 +82,7 @@ import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -262,6 +269,7 @@ public class DockerLinuxContainerRuntime implements
LinuxContainerRuntime {
private Configuration conf;
private Context nmContext;
private DockerClient dockerClient;
+ private Map<String, CsiAdaptorProtocol> csiClients = new HashMap<>();
private PrivilegedOperationExecutor privilegedOperationExecutor;
private String defaultImageName;
private Set<String> allowedNetworks = new HashSet<>();
@@ -363,6 +371,9 @@ public class DockerLinuxContainerRuntime implements
LinuxContainerRuntime {
throw new ContainerExecutionException(message);
}
+ // initialize csi adaptors if necessary
+ initiateCsiClients(conf);
+
privilegedContainersAcl = new AccessControlList(conf.getTrimmed(
YarnConfiguration.NM_DOCKER_PRIVILEGED_CONTAINERS_ACL,
YarnConfiguration.DEFAULT_NM_DOCKER_PRIVILEGED_CONTAINERS_ACL));
@@ -398,6 +409,10 @@ public class DockerLinuxContainerRuntime implements
LinuxContainerRuntime {
YarnConfiguration.NM_DOCKER_DEFAULT_TMPFS_MOUNTS)));
}
+ public Map<String, CsiAdaptorProtocol> getCsiClients() {
+ return csiClients;
+ }
+
@Override
public boolean isRuntimeRequested(Map<String, String> env) {
return isDockerContainerRequested(conf, env);
@@ -942,6 +957,18 @@ public class DockerLinuxContainerRuntime implements
LinuxContainerRuntime {
}
}
+ ContainerVolumePublisher publisher = new ContainerVolumePublisher(
+ container, container.getCsiVolumesRootDir(), this);
+ try {
+ Map<String, String> volumeMounts = publisher.publishVolumes();
+ volumeMounts.forEach((local, remote) ->
+ runCommand.addReadWriteMountLocation(local, remote));
+ } catch (YarnException | IOException e) {
+ throw new ContainerExecutionException(
+ "Container requests for volume resource but we are failed"
+ + " to publish volumes on this node");
+ }
+
if (environment.containsKey(ENV_DOCKER_CONTAINER_TMPFS_MOUNTS)) {
String[] tmpfsMounts = environment.get(ENV_DOCKER_CONTAINER_TMPFS_MOUNTS)
.split(",");
@@ -1442,6 +1469,14 @@ public class DockerLinuxContainerRuntime implements
LinuxContainerRuntime {
ContainerExecutor.Signal signal) throws ContainerExecutionException {
Container container = ctx.getContainer();
+ ContainerVolumePublisher publisher = new ContainerVolumePublisher(
+ container, container.getCsiVolumesRootDir(), this);
+ try {
+ publisher.unpublishVolumes();
+ } catch (YarnException | IOException e) {
+ throw new ContainerExecutionException(e);
+ }
+
// Only need to check whether the container was asked to be privileged.
// If the container had failed the permissions checks upon launch, it
// would have never been launched and thus we wouldn't be here
@@ -1537,4 +1572,33 @@ public class DockerLinuxContainerRuntime implements
LinuxContainerRuntime {
}
}
+ /**
+ * Initiate CSI clients to talk to the CSI adaptors on this node and
+ * cache the clients for easier fetch.
+ * @param config configuration
+ * @throws ContainerExecutionException
+ */
+ private void initiateCsiClients(Configuration config)
+ throws ContainerExecutionException {
+ String[] driverNames = CsiConfigUtils.getCsiDriverNames(config);
+ if (driverNames != null && driverNames.length > 0) {
+ for (String driverName : driverNames) {
+ try {
+ // find out the adaptors service address
+ InetSocketAddress adaptorServiceAddress =
+ CsiConfigUtils.getCsiAdaptorAddressForDriver(driverName, config);
+ LOG.info("Initializing a csi-adaptor-client for csi-adaptor {},"
+ + " csi-driver {}", adaptorServiceAddress.toString(),
driverName);
+ CsiAdaptorProtocolPBClientImpl client =
+ new CsiAdaptorProtocolPBClientImpl(1L, adaptorServiceAddress,
+ config);
+ csiClients.put(driverName, client);
+ } catch (IOException e1) {
+ throw new ContainerExecutionException(e1.getMessage());
+ } catch (YarnException e2) {
+ throw new ContainerExecutionException(e2.getMessage());
+ }
+ }
+ }
+ }
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
index c5b8625..859f0c3 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
@@ -101,6 +101,7 @@ public class ContainerLocalizer {
new FsPermission((short)0710);
private static final FsPermission USERCACHE_FOLDER_PERMS =
new FsPermission((short) 0755);
+ public static final String CSI_VOLIUME_MOUNTS_ROOT = "csivolumes";
private final String user;
private final String appId;
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/volume/csi/ContainerVolumePublisher.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/volume/csi/ContainerVolumePublisher.java
new file mode 100644
index 0000000..78f2d2d
--- /dev/null
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/volume/csi/ContainerVolumePublisher.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.volume.csi;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.yarn.api.CsiAdaptorProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.NodePublishVolumeRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.NodeUnpublishVolumeRequest;
+import
org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import
org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.DockerLinuxContainerRuntime;
+import org.apache.hadoop.yarn.server.volume.csi.VolumeMetaData;
+import
org.apache.hadoop.yarn.server.volume.csi.exception.InvalidVolumeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Publish/un-publish CSI volumes on node manager.
+ */
+public class ContainerVolumePublisher {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ContainerVolumePublisher.class);
+
+ private final Container container;
+ private final String localMountRoot;
+ private final DockerLinuxContainerRuntime runtime;
+
+ public ContainerVolumePublisher(Container container, String localMountRoot,
+ DockerLinuxContainerRuntime runtime) {
+ LOG.info("Initiate container volume publisher, containerID={},"
+ + " volume local mount rootDir={}",
+ container.getContainerId().toString(), localMountRoot);
+ this.container = container;
+ this.localMountRoot = localMountRoot;
+ this.runtime = runtime;
+ }
+
+ /**
+ * It first discovers the volume info from container resource;
+ * then negotiates with CSI driver adaptor to publish the volume on this
+ * node manager, on a specific directory under container's work dir;
+ * and then map the local mounted directory to volume target mount in
+ * the docker container.
+ *
+ * CSI volume publish is a two phase work, by reaching up here
+ * we can assume the 1st phase is done on the RM side, which means
+ * YARN is already called the controller service of csi-driver
+ * to publish the volume; here we only need to call the node service of
+ * csi-driver to publish the volume on this local node manager.
+ *
+ * @return a map where each key is the local mounted path on current node,
+ * and value is the remote mount path on the container.
+ * @throws YarnException
+ * @throws IOException
+ */
+ public Map<String, String> publishVolumes() throws YarnException,
+ IOException {
+ LOG.info("publishing volumes");
+ Map<String, String> volumeMounts = new HashMap<>();
+ List<VolumeMetaData> volumes = getVolumes();
+ LOG.info("Found {} volumes to be published on this node", volumes.size());
+ for (VolumeMetaData volume : volumes) {
+ Map<String, String> bindings = publishVolume(volume);
+ if (bindings != null && !bindings.isEmpty()) {
+ volumeMounts.putAll(bindings);
+ }
+ }
+ return volumeMounts;
+ }
+
+ public void unpublishVolumes() throws YarnException, IOException {
+ LOG.info("Un-publishing Volumes");
+ List<VolumeMetaData> volumes = getVolumes();
+ LOG.info("Volumes to un-publish {}", volumes.size());
+ for (VolumeMetaData volume : volumes) {
+ this.unpublishVolume(volume);
+ }
+ }
+
+ private File getLocalVolumeMountPath(
+ String containerWorkDir, String volumeId) {
+ return new File(containerWorkDir, volumeId + "_mount");
+ }
+
+ private File getLocalVolumeStagingPath(
+ String containerWorkDir, String volumeId) {
+ return new File(containerWorkDir, volumeId + "_staging");
+ }
+
+ private List<VolumeMetaData> getVolumes() throws InvalidVolumeException {
+ List<VolumeMetaData> volumes = new ArrayList<>();
+ Resource containerResource = container.getResource();
+ if (containerResource != null) {
+ for (ResourceInformation resourceInformation :
+ containerResource.getAllResourcesListCopy()) {
+ if (resourceInformation.getTags().contains("system:csi-volume")) {
+ volumes.addAll(VolumeMetaData.fromResource(resourceInformation));
+ }
+ }
+ }
+ if (volumes.size() > 0) {
+ LOG.info("Total number of volumes require provisioning is {}",
+ volumes.size());
+ }
+ return volumes;
+ }
+
+ private Map<String, String> publishVolume(VolumeMetaData volume)
+ throws IOException, YarnException {
+ Map<String, String> bindVolumes = new HashMap<>();
+ // compose a local mount for CSI volume with the container ID
+ File localMount = getLocalVolumeMountPath(
+ localMountRoot, volume.getVolumeId().toString());
+ File localStaging = getLocalVolumeStagingPath(
+ localMountRoot, volume.getVolumeId().toString());
+ LOG.info("Volume {}, local mount path: {}, local staging path {}",
+ volume.getVolumeId().toString(), localMount, localStaging);
+
+ NodePublishVolumeRequest publishRequest = NodePublishVolumeRequest
+ .newInstance(volume.getVolumeId().getId(), // volume Id
+ false, // read only flag
+ localMount.getAbsolutePath(), // target path
+ localStaging.getAbsolutePath(), // staging path
+ new ValidateVolumeCapabilitiesRequest.VolumeCapability(
+ ValidateVolumeCapabilitiesRequest
+ .AccessMode.SINGLE_NODE_WRITER,
+ ValidateVolumeCapabilitiesRequest.VolumeType.FILE_SYSTEM,
+ ImmutableList.of()), // capability
+ ImmutableMap.of(), // publish context
+ ImmutableMap.of()); // secrets
+
+ // make sure the volume is a known type
+ if (runtime.getCsiClients().get(volume.getDriverName()) == null) {
+ throw new YarnException("No csi-adaptor is found that can talk"
+ + " to csi-driver " + volume.getDriverName());
+ }
+
+ // publish volume to node
+ LOG.info("Publish volume on NM, request {}",
+ publishRequest.toString());
+ runtime.getCsiClients().get(volume.getDriverName())
+ .nodePublishVolume(publishRequest);
+ // once succeed, bind the container to this mount
+ String containerMountPath = volume.getMountPoint();
+ bindVolumes.put(localMount.getAbsolutePath(), containerMountPath);
+ return bindVolumes;
+ }
+
+ private void unpublishVolume(VolumeMetaData volume)
+ throws YarnException, IOException {
+ CsiAdaptorProtocol csiClient =
+ runtime.getCsiClients().get(volume.getDriverName());
+ if (csiClient == null) {
+ throw new YarnException(
+ "No csi-adaptor is found that can talk"
+ + " to csi-driver " + volume.getDriverName());
+ }
+
+ // When container is launched, the container work dir is memorized,
+ // and that is also the dir we mount the volume to.
+ File localMount = getLocalVolumeMountPath(container.getCsiVolumesRootDir(),
+ volume.getVolumeId().toString());
+ if (!localMount.exists()) {
+ LOG.info("Local mount {} no longer exist, skipping cleaning"
+ + " up the volume", localMount.getAbsolutePath());
+ return;
+ }
+ NodeUnpublishVolumeRequest unpublishRequest =
+ NodeUnpublishVolumeRequest.newInstance(
+ volume.getVolumeId().getId(), // volume id
+ localMount.getAbsolutePath()); // target path
+
+ // un-publish volume from node
+ LOG.info("Un-publish volume {}, request {}",
+ volume.getVolumeId().toString(), unpublishRequest.toString());
+ csiClient.nodeUnpublishVolume(unpublishRequest);
+ }
+}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/YarnCsiAdaptor.proto
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/volume/csi/package-info.java
similarity index 62%
copy from
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/YarnCsiAdaptor.proto
copy to
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/volume/csi/package-info.java
index 9dcb8a7..5b894b8 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/YarnCsiAdaptor.proto
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/volume/csi/package-info.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,19 +16,7 @@
* limitations under the License.
*/
-option java_package = "org.apache.hadoop.yarn.proto";
-option java_outer_classname = "CsiAdaptorProtocol";
-option java_generic_services = true;
-option java_generate_equals_and_hash = true;
-package hadoop.yarn;
-
-import "yarn_csi_adaptor.proto";
-
-service CsiAdaptorProtocolService {
-
- rpc getPluginInfo (GetPluginInfoRequest)
- returns (GetPluginInfoResponse);
-
- rpc validateVolumeCapacity (ValidateVolumeCapabilitiesRequest)
- returns (ValidateVolumeCapabilitiesResponse);
-}
+/**
+ * CSI volumes.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.volume.csi;
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/executor/ContainerStartContext.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/executor/ContainerStartContext.java
index 444a1e0..01d34f1 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/executor/ContainerStartContext.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/executor/ContainerStartContext.java
@@ -45,6 +45,7 @@ public final class ContainerStartContext {
private final String user;
private final String appId;
private final Path containerWorkDir;
+ private final Path csiVolumesRootDir;
private final List<String> localDirs;
private final List<String> logDirs;
private final List<String> filecacheDirs;
@@ -64,6 +65,7 @@ public final class ContainerStartContext {
private String user;
private String appId;
private Path containerWorkDir;
+ private Path csiVolumesRoot;
private List<String> localDirs;
private List<String> logDirs;
private List<String> filecacheDirs;
@@ -118,6 +120,11 @@ public final class ContainerStartContext {
return this;
}
+ public Builder setContainerCsiVolumesRootDir(Path csiVolumesRootDir) {
+ this.csiVolumesRoot = csiVolumesRootDir;
+ return this;
+ }
+
public Builder setContainerWorkDir(Path containerWorkDir) {
this.containerWorkDir = containerWorkDir;
return this;
@@ -188,6 +195,7 @@ public final class ContainerStartContext {
this.containerLogDirs = builder.containerLogDirs;
this.userFilecacheDirs = builder.userFilecacheDirs;
this.applicationLocalDirs = builder.applicationLocalDirs;
+ this.csiVolumesRootDir = builder.csiVolumesRoot;
}
public Container getContainer() {
@@ -262,4 +270,8 @@ public final class ContainerStartContext {
public List<String> getApplicationLocalDirs() {
return Collections.unmodifiableList(this.applicationLocalDirs);
}
+
+ public Path getCsiVolumesRootDir() {
+ return this.csiVolumesRootDir;
+ }
}
\ No newline at end of file
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
index c686000..f1b39bd 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
@@ -174,6 +174,16 @@ public class MockContainer implements Container {
}
@Override
+ public String getCsiVolumesRootDir() {
+ return null;
+ }
+
+ @Override
+ public void setCsiVolumesRootDir(String volumesRootDir) {
+
+ }
+
+ @Override
public String getLogDir() {
return null;
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeImpl.java
index 82a4acb..068e8a4 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeImpl.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/lifecycle/VolumeImpl.java
@@ -146,6 +146,8 @@ public class VolumeImpl implements Volume {
@Override
public VolumeState transition(VolumeImpl volume,
VolumeEvent volumeEvent) {
+ // Some of CSI driver implementation does't provide the capability
+ // to validate volumes. Skip this for now.
try {
// this call could cross node, we should keep the message tight
// TODO we should parse the capability from volume resource spec
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]