Repository: samza Updated Branches: refs/heads/master 9d52e996b -> e4cfeebeb
SAMZA-1143; Universal config support for localized resource More details in https://issues.apache.org/jira/browse/SAMZA-1143 Tests: ./gradlew clean check successful and all unit tests passed Author: Fred Ji <[email protected]> Reviewers: Jagadish <[email protected]> Closes #90 from fredji97/universalLocalizer Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e4cfeebe Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e4cfeebe Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e4cfeebe Branch: refs/heads/master Commit: e4cfeebebf3ec94ca3b5aca2e4aed10e77db38ae Parents: 9d52e99 Author: Fred Ji <[email protected]> Authored: Mon Mar 27 10:34:12 2017 -0700 Committer: vjagadish1989 <[email protected]> Committed: Mon Mar 27 10:34:12 2017 -0700 ---------------------------------------------------------------------- .../samza/job/yarn/FileSystemImplConfig.java | 86 +++++++++ .../samza/job/yarn/LocalizerResourceConfig.java | 102 +++++++++++ .../job/yarn/LocalizerResourceException.java | 72 ++++++++ .../samza/job/yarn/LocalizerResourceMapper.java | 101 +++++++++++ .../job/yarn/YarnClusterResourceManager.java | 8 +- .../samza/job/yarn/YarnContainerRunner.java | 26 ++- .../job/yarn/YarnResourceManagerFactory.java | 1 - .../apache/samza/job/yarn/ClientHelper.scala | 21 ++- .../apache/samza/job/yarn/YarnJobFactory.scala | 11 +- .../job/yarn/TestFileSystemImplConfig.java | 75 ++++++++ .../job/yarn/TestLocalizerResourceConfig.java | 125 +++++++++++++ .../job/yarn/TestLocalizerResourceMapper.java | 174 +++++++++++++++++++ .../samza/job/yarn/TestYarnJobFactory.java | 48 +++++ .../samza/job/yarn/TestYarnJobFactory.scala | 45 ----- 14 files changed, 836 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/e4cfeebe/samza-yarn/src/main/java/org/apache/samza/job/yarn/FileSystemImplConfig.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/FileSystemImplConfig.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/FileSystemImplConfig.java new file mode 100644 index 0000000..8e79104 --- /dev/null +++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/FileSystemImplConfig.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.job.yarn; + +import java.util.ArrayList; +import java.util.List; +import org.apache.commons.lang.StringUtils; +import org.apache.samza.config.Config; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * FileSystemImplConfig is intended to manage the Samza config for fs.<scheme>impl. + * e.g. fs.http.impl + */ +public class FileSystemImplConfig { + private static final Logger log = LoggerFactory.getLogger(FileSystemImplConfig.class); + private static final String FS_IMPL_PREFIX = "fs."; + private static final String FS_IMPL_SUFFIX = ".impl"; + private static final String FS_IMPL = "fs.%s.impl"; + + private final Config config; + + public FileSystemImplConfig(final Config config) { + if (null == config) { + throw new IllegalArgumentException("config cannot be null"); + } + this.config = config; + } + + /** + * Get all schemes + * @return List of schemes in strings + */ + public List<String> getSchemes() { + Config subConfig = config.subset(FS_IMPL_PREFIX, true); + List<String> schemes = new ArrayList<String>(); + for (String key : subConfig.keySet()) { + if (key.endsWith(FS_IMPL_SUFFIX)) { + schemes.add(key.substring(0, key.length() - FS_IMPL_SUFFIX.length())); + } + } + return schemes; + } + + /** + * Get the fs.<scheme>impl as the config key from scheme + * @param scheme scheme name, such as http, hdfs, myscheme + * @return fs.<scheme>impl + */ + public String getFsImplKey(final String scheme) { + String fsImplKey = String.format(FS_IMPL, scheme); + return fsImplKey; + } + + /** + * Get the class name corresponding for the given scheme + * @param scheme scheme name, such as http, hdfs, myscheme + * @return full scoped class name for the file system for <scheme> + */ + public String getFsImplClassName(final String scheme) { + String fsImplKey = getFsImplKey(scheme); + String fsImplClassName = config.get(fsImplKey); + if (StringUtils.isEmpty(fsImplClassName)) { + throw new LocalizerResourceException(fsImplKey + " does not have configured class implementation"); + } + return fsImplClassName; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/e4cfeebe/samza-yarn/src/main/java/org/apache/samza/job/yarn/LocalizerResourceConfig.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/LocalizerResourceConfig.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/LocalizerResourceConfig.java new file mode 100644 index 0000000..ca94783 --- /dev/null +++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/LocalizerResourceConfig.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.job.yarn; + +import java.util.ArrayList; +import java.util.List; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.samza.config.Config; + +/** + * LocalizerResourceConfig is intended to manage/fetch the config values + * for the yarn localizer resource(s) from the configuration. + * + * There are 4 config values + * yarn.resources.<resourceName>.path + * (Required) The path for fetching the resource for localization, + * e.g. http://hostname.com/test. + * yarn.resources.<resourceName>.local.name + * (Optional) The local name used for the localized resource. + * If not set, the default one will be the <resourceName> from the config key. + * yarn.resources.<resourceName>.local.type + * (Optional) The value value is a string format of {@link LocalResourceType}: + * ARCHIVE, FILE, PATTERN. + * If not set, the default value is FILE. + * yarn.resources.<resourceName>.local.visibility + * (Optional) The valid value is a string format of {@link LocalResourceVisibility}: + * PUBLIC, PRIVATE, or APPLICATION. + * If not set, the default value is is APPLICATION. + */ +public class LocalizerResourceConfig { + private static final String RESOURCE_PREFIX = "yarn.resources."; + private static final String PATH_SUFFIX = ".path"; + private static final String RESOURCE_PATH = "yarn.resources.%s.path"; + private static final String RESOURCE_LOCAL_NAME = "yarn.resources.%s.local.name"; + private static final String RESOURCE_LOCAL_TYPE = "yarn.resources.%s.local.type"; + private static final String RESOURCE_LOCAL_VISIBILITY = "yarn.resources.%s.local.visibility"; + private static final String DEFAULT_RESOURCE_LOCAL_TYPE = "FILE"; + private static final String DEFAULT_RESOURCE_LOCAL_VISIBILITY = "APPLICATION"; + + private final Config config; + + public LocalizerResourceConfig(final Config config) { + if (null == config) { + throw new IllegalArgumentException("config cannot be null"); + } + this.config = config; + } + + public List<String> getResourceNames() { + Config subConfig = config.subset(RESOURCE_PREFIX, true); + List<String> resourceNames = new ArrayList<String>(); + for (String key : subConfig.keySet()) { + if (key.endsWith(PATH_SUFFIX)) { + resourceNames.add(key.substring(0, key.length() - PATH_SUFFIX.length())); + } + } + return resourceNames; + } + + public Path getResourcePath(final String resourceName) { + String pathStr = config.get(String.format(RESOURCE_PATH, resourceName)); + if (StringUtils.isEmpty(pathStr)) { + throw new LocalizerResourceException("resource path is required but not defined in config for resource " + resourceName); + } + return new Path(pathStr); + } + + public LocalResourceType getResourceLocalType(final String resourceName) { + String typeStr = config.get(String.format(RESOURCE_LOCAL_TYPE, resourceName), DEFAULT_RESOURCE_LOCAL_TYPE); + return LocalResourceType.valueOf(StringUtils.upperCase(typeStr)); + } + + public LocalResourceVisibility getResourceLocalVisibility(final String resourceName) { + String visibilityStr = config.get(String.format(RESOURCE_LOCAL_VISIBILITY, resourceName), DEFAULT_RESOURCE_LOCAL_VISIBILITY); + return LocalResourceVisibility.valueOf(StringUtils.upperCase(visibilityStr)); + } + + public String getResourceLocalName(final String resourceName) { + String name = config.get(String.format(RESOURCE_LOCAL_NAME, resourceName), resourceName); + return name; + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/e4cfeebe/samza-yarn/src/main/java/org/apache/samza/job/yarn/LocalizerResourceException.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/LocalizerResourceException.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/LocalizerResourceException.java new file mode 100644 index 0000000..0df6903 --- /dev/null +++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/LocalizerResourceException.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.job.yarn; + +public class LocalizerResourceException extends RuntimeException { + + /** + * Constructs an {@code LocalizerResourceException} with {@code null} + * as its error detail message. + */ + public LocalizerResourceException() { + super(); + } + + /** + * Constructs an {@code LocalizerResourceException} with the specified detail message. + * + * @param message + * The detail message (which is saved for later retrieval + * by the {@link #getMessage()} method) + */ + public LocalizerResourceException(String message) { + super(message); + } + + /** + * Constructs an {@code LocalizerResourceException} with the specified detail message + * and cause. + * + * @param message + * The detail message (which is saved for later retrieval + * by the {@link #getMessage()} method) + * + * @param cause + * The cause (which is saved for later retrieval by the + * {@link #getCause()} method). (A null value is permitted, + * and indicates that the cause is nonexistent or unknown.) + */ + public LocalizerResourceException(String message, Throwable cause) { + super(message, cause); + } + + /** + * Constructs an {@code LocalizerResourceException} with the specified cause and a + * detail message of {@code (cause==null ? null : cause.toString())} + * + * @param cause + * The cause (which is saved for later retrieval by the + * {@link #getCause()} method). (A null value is permitted, + * and indicates that the cause is nonexistent or unknown.) + */ + public LocalizerResourceException(Throwable cause) { + super(cause); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/e4cfeebe/samza-yarn/src/main/java/org/apache/samza/job/yarn/LocalizerResourceMapper.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/LocalizerResourceMapper.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/LocalizerResourceMapper.java new file mode 100644 index 0000000..6dddb0a --- /dev/null +++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/LocalizerResourceMapper.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.job.yarn; + +import com.google.common.collect.ImmutableMap; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.URL; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.Records; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * A universal approach to generate local resource map which can be put in ContainerLaunchContext directly + */ +public class LocalizerResourceMapper { + private static final Logger log = LoggerFactory.getLogger(LocalizerResourceMapper.class); + + private final YarnConfiguration yarnConfiguration; //yarn configurations + private final LocalizerResourceConfig resourceConfig; + private final Map<String, LocalResource> localResourceMap; + + public LocalizerResourceMapper(LocalizerResourceConfig resourceConfig, YarnConfiguration yarnConfiguration) { + this.yarnConfiguration = yarnConfiguration; + this.resourceConfig = resourceConfig; + this.localResourceMap = buildResourceMapping(); + } + + private Map<String, LocalResource> buildResourceMapping() { + ImmutableMap.Builder<String, LocalResource> localResourceMapBuilder = ImmutableMap.builder(); + + List<String> resourceNames = resourceConfig.getResourceNames(); + for (String resourceName : resourceNames) { + String resourceLocalName = resourceConfig.getResourceLocalName(resourceName); + LocalResourceType resourceType = resourceConfig.getResourceLocalType(resourceName); + LocalResourceVisibility resourceVisibility = resourceConfig.getResourceLocalVisibility(resourceName); + Path resourcePath = resourceConfig.getResourcePath(resourceName); + + LocalResource localResource = createLocalResource(resourcePath, resourceType, resourceVisibility); + + localResourceMapBuilder.put(resourceLocalName, localResource); + log.info("preparing local resource: {}", resourceLocalName); + } + + return localResourceMapBuilder.build(); + } + + private LocalResource createLocalResource(Path resourcePath, LocalResourceType resourceType, LocalResourceVisibility resourceVisibility) { + LocalResource localResource = Records.newRecord(LocalResource.class); + URL resourceUrl = ConverterUtils.getYarnUrlFromPath(resourcePath); + try { + FileStatus resourceFileStatus = resourcePath.getFileSystem(yarnConfiguration).getFileStatus(resourcePath); + + if (null == resourceFileStatus) { + throw new LocalizerResourceException("Check getFileStatus implementation. getFileStatus gets unexpected null for resourcePath " + resourcePath); + } + + localResource.setResource(resourceUrl); + log.info("setLocalizerResource for {}", resourceUrl); + localResource.setSize(resourceFileStatus.getLen()); + localResource.setTimestamp(resourceFileStatus.getModificationTime()); + localResource.setType(resourceType); + localResource.setVisibility(resourceVisibility); + return localResource; + } catch (IOException ioe) { + log.error("IO Exception when accessing the resource file status from the filesystem: " + resourcePath, ioe); + throw new LocalizerResourceException("IO Exception when accessing the resource file status from the filesystem: " + resourcePath); + } + + } + + public Map<String, LocalResource> getResourceMap() { + return ImmutableMap.copyOf(localResourceMap); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/e4cfeebe/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java index c1b71bb..04c78be 100644 --- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java +++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java @@ -36,12 +36,10 @@ import org.apache.samza.config.ShellCommandConfig; import org.apache.samza.config.YarnConfig; import org.apache.samza.coordinator.JobModelManager; import org.apache.samza.job.CommandBuilder; -import org.apache.samza.job.yarn.YarnContainer; import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.util.hadoop.HttpFileSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -120,6 +118,12 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement hConfig = new YarnConfiguration(); hConfig.set("fs.http.impl", HttpFileSystem.class.getName()); + // Use the Samza job config "fs.<scheme>.impl" to override YarnConfiguration + FileSystemImplConfig fsImplConfig = new FileSystemImplConfig(config); + fsImplConfig.getSchemes().forEach( + scheme -> hConfig.set(fsImplConfig.getFsImplKey(scheme), fsImplConfig.getFsImplClassName(scheme)) + ); + MetricsRegistryMap registry = new MetricsRegistryMap(); metrics = new SamzaAppMasterMetrics(config, samzaAppState, registry); http://git-wip-us.apache.org/repos/asf/samza/blob/e4cfeebe/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java index 181102b..c45fc7f 100644 --- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java +++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnContainerRunner.java @@ -43,10 +43,12 @@ import org.apache.samza.job.CommandBuilder; import org.apache.samza.util.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import java.io.IOException; import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; /** * A Helper class to run container processes on Yarn. This encapsulates quite a bit of YarnContainer @@ -112,11 +114,11 @@ public class YarnContainerRunner { log.info("Samza FWK path: " + command + "; env=" + env); - Path path = new Path(yarnConfig.getPackagePath()); - log.info("Starting container ID {} using package path {}", samzaContainerId, path); + Path packagePath = new Path(yarnConfig.getPackagePath()); + log.info("Starting container ID {} using package path {}", samzaContainerId, packagePath); startContainer( - path, + packagePath, container, env, getFormattedCommand( @@ -151,6 +153,8 @@ public class YarnContainerRunner { log.info("starting container {} {} {} {}", new Object[]{packagePath, container, env, cmd}); + // TODO: SAMZA-1144 remove the customized approach for package resource and use the common one. + // But keep it now for backward compatibility. // set the local package so that the containers and app master are provisioned with it LocalResource packageResource = Records.newRecord(LocalResource.class); URL packageUrl = ConverterUtils.getYarnUrlFromPath(packagePath); @@ -163,6 +167,7 @@ public class YarnContainerRunner { } packageResource.setResource(packageUrl); + log.info("set package Resource in YarnContainerRunner for {}", packageUrl); packageResource.setSize(fileStatus.getLen()); packageResource.setTimestamp(fileStatus.getModificationTime()); packageResource.setType(LocalResourceType.ARCHIVE); @@ -190,13 +195,20 @@ public class YarnContainerRunner { throw new SamzaContainerLaunchException("IO Exception when writing credentials to output buffer"); } + Map<String, LocalResource> localResourceMap = new HashMap<>(); + localResourceMap.put("__package", packageResource); + + // include the resources from the universal resource configurations + LocalizerResourceMapper resourceMapper = new LocalizerResourceMapper(new LocalizerResourceConfig(config), yarnConfiguration); + localResourceMap.putAll(resourceMapper.getResourceMap()); + ContainerLaunchContext context = Records.newRecord(ContainerLaunchContext.class); context.setEnvironment(env); context.setTokens(allTokens.duplicate()); context.setCommands(new ArrayList<String>() {{add(cmd);}}); - context.setLocalResources(Collections.singletonMap("__package", packageResource)); + context.setLocalResources(localResourceMap); - log.debug("setting package to {}", packageResource); + log.debug("setting localResourceMap to {}", localResourceMap); log.debug("setting context to {}", context); StartContainerRequest startContainerRequest = Records.newRecord(StartContainerRequest.class); http://git-wip-us.apache.org/repos/asf/samza/blob/e4cfeebe/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnResourceManagerFactory.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnResourceManagerFactory.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnResourceManagerFactory.java index 988a8e8..3f9a84d 100644 --- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnResourceManagerFactory.java +++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnResourceManagerFactory.java @@ -16,7 +16,6 @@ * specific language governing permissions and limitations * under the License. */ - package org.apache.samza.job.yarn; import org.apache.samza.clustermanager.ClusterResourceManager; http://git-wip-us.apache.org/repos/asf/samza/blob/e4cfeebe/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala index 6c63b63..e5aafbb 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala @@ -20,12 +20,12 @@ package org.apache.samza.job.yarn import org.apache.hadoop.fs.permission.FsPermission -import org.apache.samza.config.{JobConfig, Config, YarnConfig} -import org.apache.samza.coordinator.stream.{CoordinatorStreamWriter} +import org.apache.samza.config.{Config, JobConfig, YarnConfig} +import org.apache.samza.coordinator.stream.CoordinatorStreamWriter import org.apache.samza.coordinator.stream.messages.SetConfig import scala.collection.JavaConversions._ -import scala.collection.{Map} +import scala.collection.Map import scala.collection.mutable.HashMap import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -140,6 +140,8 @@ class ClientHelper(conf: Configuration) extends Logging { case None => } + // TODO: remove the customized approach for package resource and use the common one. + // But keep it now for backward compatibility. // set the local package so that the containers and app master are provisioned with it val packageUrl = ConverterUtils.getYarnUrlFromPath(packagePath) val fs = packagePath.getFileSystem(conf) @@ -167,6 +169,17 @@ class ClientHelper(conf: Configuration) extends Logging { val localResources: HashMap[String, LocalResource] = HashMap[String, LocalResource]() localResources += "__package" -> packageResource + + // include the resources from the universal resource configurations + try { + val resourceMapper = new LocalizerResourceMapper(new LocalizerResourceConfig(config), new YarnConfiguration(conf)) + localResources ++= resourceMapper.getResourceMap + } catch { + case e: LocalizerResourceException => { + throw new SamzaException("Exception during resource mapping from config. ", e) + } + } + if (UserGroupInformation.isSecurityEnabled()) { validateJobConfig(config) @@ -187,6 +200,8 @@ class ClientHelper(conf: Configuration) extends Logging { coordinatorStreamWriter.stop() } + // prepare all local resources for localizer + info("localResources is: %s" format localResources) containerCtx.setLocalResources(localResources) info("set local resources on application master for %s" format appId.get) http://git-wip-us.apache.org/repos/asf/samza/blob/e4cfeebe/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala index 625d3bb..f057594 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala @@ -25,8 +25,10 @@ import org.apache.samza.job.StreamJobFactory import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.samza.config.Config import org.apache.samza.util.hadoop.HttpFileSystem +import org.apache.samza.util.Logging +import scala.collection.JavaConversions._ -class YarnJobFactory extends StreamJobFactory { +class YarnJobFactory extends StreamJobFactory with Logging { def getJob(config: Config) = { // TODO fix this. needed to support http package locations. val hConfig = new YarnConfiguration @@ -37,6 +39,13 @@ class YarnJobFactory extends StreamJobFactory { if (config.containsKey(YarnConfiguration.RM_ADDRESS)) { hConfig.set(YarnConfiguration.RM_ADDRESS, config.get(YarnConfiguration.RM_ADDRESS, "0.0.0.0:8032")) } + + // Use the Samza job config "fs.<scheme>.impl" to override YarnConfiguration + val fsImplConfig = new FileSystemImplConfig(config) + fsImplConfig.getSchemes.foreach( + (scheme : String) => hConfig.set(fsImplConfig.getFsImplKey(scheme), fsImplConfig.getFsImplClassName(scheme)) + ) + new YarnJob(config, hConfig) } } http://git-wip-us.apache.org/repos/asf/samza/blob/e4cfeebe/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestFileSystemImplConfig.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestFileSystemImplConfig.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestFileSystemImplConfig.java new file mode 100644 index 0000000..6e11c66 --- /dev/null +++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestFileSystemImplConfig.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.job.yarn; + +import java.util.HashMap; +import java.util.Map; +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import static org.junit.Assert.assertEquals; + +public class TestFileSystemImplConfig { + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testFileSystemImplConfigSuccess() { + Map<String, String> configMap = new HashMap<>(); + + configMap.put("fs.http.impl", "org.apache.samza.HttpFileSystem"); + configMap.put("fs.myscheme.impl", "org.apache.samza.MySchemeFileSystem"); + + Config conf = new MapConfig(configMap); + + FileSystemImplConfig manager = new FileSystemImplConfig(conf); + assertEquals(2, manager.getSchemes().size()); + assertEquals("http", manager.getSchemes().get(0)); + assertEquals("myscheme", manager.getSchemes().get(1)); + + assertEquals("fs.http.impl", manager.getFsImplKey("http")); + assertEquals("fs.myscheme.impl", manager.getFsImplKey("myscheme")); + + assertEquals("org.apache.samza.HttpFileSystem", manager.getFsImplClassName("http")); + assertEquals("org.apache.samza.MySchemeFileSystem", manager.getFsImplClassName("myscheme")); + } + + @Test + public void testNullConfig() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("config cannot be null"); + FileSystemImplConfig manager = new FileSystemImplConfig(null); + } + + @Test + public void testEmptyImpl() { + thrown.expect(LocalizerResourceException.class); + thrown.expectMessage("fs.http.impl does not have configured class implementation"); + + Map<String, String> configMap = new HashMap<>(); + configMap.put("fs.http.impl", ""); + Config conf = new MapConfig(configMap); + + FileSystemImplConfig manager = new FileSystemImplConfig(conf); + manager.getFsImplClassName("http"); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/e4cfeebe/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestLocalizerResourceConfig.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestLocalizerResourceConfig.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestLocalizerResourceConfig.java new file mode 100644 index 0000000..e003125 --- /dev/null +++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestLocalizerResourceConfig.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.job.yarn; + +import java.util.HashMap; +import java.util.Map; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; +import org.junit.Rule; +import org.junit.rules.ExpectedException; +import org.junit.Test; +import static org.junit.Assert.assertEquals; + +public class TestLocalizerResourceConfig { + + @Rule + public ExpectedException thrown= ExpectedException.none(); + + @Test + public void testResourceConfigIncluded() { + Map<String, String> configMap = new HashMap<>(); + + configMap.put("yarn.resources.myResource1.path", "http://host1.com/readme"); + configMap.put("yarn.resources.myResource1.local.name", "readme"); + configMap.put("yarn.resources.myResource1.local.type", "file"); + configMap.put("yarn.resources.myResource1.local.visibility", "public"); + + Config conf = new MapConfig(configMap); + + LocalizerResourceConfig manager = new LocalizerResourceConfig(conf); + assertEquals(1, manager.getResourceNames().size()); + assertEquals("myResource1", manager.getResourceNames().get(0)); + assertEquals("readme", manager.getResourceLocalName("myResource1")); + assertEquals(LocalResourceType.FILE, manager.getResourceLocalType("myResource1")); + assertEquals(LocalResourceVisibility.PUBLIC, manager.getResourceLocalVisibility("myResource1")); + } + + @Test + public void testResourcrConfigNotIncluded() { + Map<String, String> configMap = new HashMap<>(); + + configMap.put("otherconfig", "https://host2.com/not_included"); + configMap.put("yarn.resources.myResource2.local.name", "notExisting"); + configMap.put("yarn.resources.myResource2.local.type", "file"); + configMap.put("yarn.resources.myResource2.local.visibility", "application"); + + Config conf = new MapConfig(configMap); + + LocalizerResourceConfig manager = new LocalizerResourceConfig(conf); + assertEquals(0, manager.getResourceNames().size()); + } + + @Test + public void testNullConfig() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("config cannot be null"); + LocalizerResourceConfig manager = new LocalizerResourceConfig(null); + } + + @Test + public void testInvalidVisibility() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("No enum constant org.apache.hadoop.yarn.api.records.LocalResourceVisibility.INVALIDVISIBILITY"); + + Map<String, String> configMap = new HashMap<>(); + configMap.put("yarn.resources.myResource1.path", "http://host1.com/readme"); + configMap.put("yarn.resources.myResource1.local.name", "readme"); + configMap.put("yarn.resources.myResource1.local.type", "file"); + configMap.put("yarn.resources.myResource1.local.visibility", "invalidVisibility"); + Config conf = new MapConfig(configMap); + + LocalizerResourceConfig manager = new LocalizerResourceConfig(conf); + manager.getResourceLocalVisibility("myResource1"); + } + + @Test + public void testInvalidType() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("No enum constant org.apache.hadoop.yarn.api.records.LocalResourceType.INVALIDTYPE"); + + Map<String, String> configMap = new HashMap<>(); + configMap.put("yarn.resources.myResource1.path", "http://host1.com/readme"); + configMap.put("yarn.resources.myResource1.local.name", "readme"); + configMap.put("yarn.resources.myResource1.local.type", "invalidType"); + configMap.put("yarn.resources.myResource1.local.visibility", "application"); + Config conf = new MapConfig(configMap); + + LocalizerResourceConfig manager = new LocalizerResourceConfig(conf); + manager.getResourceLocalType("myResource1"); + } + + @Test + public void testInvalidPath() { + thrown.expect(LocalizerResourceException.class); + thrown.expectMessage("resource path is required but not defined in config for resource myResource1"); + Map<String, String> configMap = new HashMap<>(); + configMap.put("yarn.resources.myResource1.path", ""); + configMap.put("yarn.resources.myResource1.local.name", "readme"); + configMap.put("yarn.resources.myResource1.local.type", "invalidType"); + configMap.put("yarn.resources.myResource1.local.visibility", "application"); + Config conf = new MapConfig(configMap); + + LocalizerResourceConfig manager = new LocalizerResourceConfig(conf); + manager.getResourcePath("myResource1"); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/e4cfeebe/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestLocalizerResourceMapper.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestLocalizerResourceMapper.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestLocalizerResourceMapper.java new file mode 100644 index 0000000..d065019 --- /dev/null +++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestLocalizerResourceMapper.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.job.yarn; + +import java.util.HashMap; +import java.util.Map; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; +import org.apache.samza.util.hadoop.HttpFileSystem; +import org.junit.Rule; +import org.junit.rules.ExpectedException; +import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +public class TestLocalizerResourceMapper { + + @Rule + public ExpectedException thrown= ExpectedException.none(); + + @Test + public void testResourceMapSuccess() { + + Map<String, String> configMap = new HashMap<>(); + + configMap.put("yarn.resources.myResource1.path", "http://host1.com/readme"); + configMap.put("yarn.resources.myResource1.local.name", "readme"); + configMap.put("yarn.resources.myResource1.local.type", "file"); + configMap.put("yarn.resources.myResource1.local.visibility", "public"); + + configMap.put("yarn.resources.myResource2.path", "https://host2.com/package"); + configMap.put("yarn.resources.myResource2.local.name", "__package"); + configMap.put("yarn.resources.myResource2.local.type", "archive"); + configMap.put("yarn.resources.myResource2.local.visibility", "private"); + + configMap.put("yarn.resources.myResource3.path", "https://host3.com/csr"); + configMap.put("yarn.resources.myResource3.local.name", "csr"); + configMap.put("yarn.resources.myResource3.local.type", "file"); + configMap.put("yarn.resources.myResource3.local.visibility", "application"); + + configMap.put("otherconfig", "https://host4.com/not_included"); + configMap.put("yarn.resources.myResource4.local.name", "notExisting"); + configMap.put("yarn.resources.myResource4.local.type", "file"); + configMap.put("yarn.resources.myResource4.local.visibility", "application"); + + Config conf = new MapConfig(configMap); + + YarnConfiguration yarnConfiguration = new YarnConfiguration(); + yarnConfiguration.set("fs.http.impl", HttpFileSystem.class.getName()); + yarnConfiguration.set("fs.https.impl", HttpFileSystem.class.getName()); + + LocalizerResourceMapper mapper = new LocalizerResourceMapper(new LocalizerResourceConfig(conf), yarnConfiguration); + Map<String, LocalResource> resourceMap = mapper.getResourceMap(); + + assertEquals("resourceMap has 3 resources", 3, resourceMap.size()); + + // resource1 + assertEquals("host1.com", resourceMap.get("readme").getResource().getHost()); + assertEquals(LocalResourceType.FILE, resourceMap.get("readme").getType()); + assertEquals(LocalResourceVisibility.PUBLIC, resourceMap.get("readme").getVisibility()); + + // resource 2 + assertEquals("host2.com", resourceMap.get("__package").getResource().getHost()); + assertEquals(LocalResourceType.ARCHIVE, resourceMap.get("__package").getType()); + assertEquals(LocalResourceVisibility.PRIVATE, resourceMap.get("__package").getVisibility()); + + // resource 3 + assertEquals("host3.com", resourceMap.get("csr").getResource().getHost()); + assertEquals(LocalResourceType.FILE, resourceMap.get("csr").getType()); + assertEquals(LocalResourceVisibility.APPLICATION, resourceMap.get("csr").getVisibility()); + + // resource 4 should not exist + assertNull("Resource does not exist with the name myResource4", resourceMap.get("myResource4")); + assertNull("Resource does not exist with the defined config name notExisting for myResource4 either", resourceMap.get("notExisting")); + } + + @Test + public void testResourceMapWithDefaultValues() { + + Map<String, String> configMap = new HashMap<>(); + + configMap.put("yarn.resources.myResource1.path", "http://host1.com/readme"); + + Config conf = new MapConfig(configMap); + + YarnConfiguration yarnConfiguration = new YarnConfiguration(); + yarnConfiguration.set("fs.http.impl", HttpFileSystem.class.getName()); + + LocalizerResourceMapper mapper = new LocalizerResourceMapper(new LocalizerResourceConfig(conf), yarnConfiguration); + Map<String, LocalResource> resourceMap = mapper.getResourceMap(); + + assertNull("Resource does not exist with a name readme", resourceMap.get("readme")); + assertNotNull("Resource exists with a name myResource1", resourceMap.get("myResource1")); + assertEquals("host1.com", resourceMap.get("myResource1").getResource().getHost()); + assertEquals(LocalResourceType.FILE, resourceMap.get("myResource1").getType()); + assertEquals(LocalResourceVisibility.APPLICATION, resourceMap.get("myResource1").getVisibility()); + } + + @Test + public void testResourceMapWithFileStatusFailure() { + thrown.expect(LocalizerResourceException.class); + thrown.expectMessage("IO Exception when accessing the resource file status from the filesystem"); + + Map<String, String> configMap = new HashMap<>(); + configMap.put("yarn.resources.myResource1.path", "unknown://host1.com/readme"); + configMap.put("yarn.resources.myResource1.local.name", "readme"); + configMap.put("yarn.resources.myResource1.local.type", "file"); + configMap.put("yarn.resources.myResource1.local.visibility", "public"); + Config conf = new MapConfig(configMap); + + YarnConfiguration yarnConfiguration = new YarnConfiguration(); + yarnConfiguration.set("fs.http.impl", HttpFileSystem.class.getName()); + yarnConfiguration.set("fs.https.impl", HttpFileSystem.class.getName()); + LocalizerResourceMapper mapper = new LocalizerResourceMapper(new LocalizerResourceConfig(conf), yarnConfiguration); + } + + @Test + public void testResourceMapWithInvalidVisibilityFailure() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("No enum constant org.apache.hadoop.yarn.api.records.LocalResourceVisibility.INVALIDVISIBILITY"); + + Map<String, String> configMap = new HashMap<>(); + configMap.put("yarn.resources.myResource1.path", "http://host1.com/readme"); + configMap.put("yarn.resources.myResource1.local.name", "readme"); + configMap.put("yarn.resources.myResource1.local.type", "file"); + configMap.put("yarn.resources.myResource1.local.visibility", "invalidVisibility"); + Config conf = new MapConfig(configMap); + + YarnConfiguration yarnConfiguration = new YarnConfiguration(); + yarnConfiguration.set("fs.http.impl", HttpFileSystem.class.getName()); + yarnConfiguration.set("fs.https.impl", HttpFileSystem.class.getName()); + LocalizerResourceMapper mapper = new LocalizerResourceMapper(new LocalizerResourceConfig(conf), yarnConfiguration); + } + + @Test + public void testResourceMapWithInvalidTypeFailure() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("No enum constant org.apache.hadoop.yarn.api.records.LocalResourceType.INVALIDTYPE"); + + Map<String, String> configMap = new HashMap<>(); + configMap.put("yarn.resources.myResource1.path", "http://host1.com/readme"); + configMap.put("yarn.resources.myResource1.local.name", "readme"); + configMap.put("yarn.resources.myResource1.local.type", "invalidType"); + configMap.put("yarn.resources.myResource1.local.visibility", "public"); + Config conf = new MapConfig(configMap); + + YarnConfiguration yarnConfiguration = new YarnConfiguration(); + yarnConfiguration.set("fs.http.impl", HttpFileSystem.class.getName()); + yarnConfiguration.set("fs.https.impl", HttpFileSystem.class.getName()); + LocalizerResourceMapper mapper = new LocalizerResourceMapper(new LocalizerResourceConfig(conf), yarnConfiguration); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/e4cfeebe/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJobFactory.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJobFactory.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJobFactory.java new file mode 100644 index 0000000..11077f0 --- /dev/null +++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJobFactory.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.job.yarn; + +import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.conf.Configuration; +import org.apache.samza.config.MapConfig; +import org.apache.samza.util.hadoop.HttpFileSystem; +import org.junit.Test; +import static org.junit.Assert.assertEquals; + +public class TestYarnJobFactory { + @Test + public void testGetJobWithDefaultFsImpl() { + YarnJobFactory jobFactory = new YarnJobFactory(); + YarnJob yarnJob = jobFactory.getJob(new MapConfig()); + Configuration hConfig = yarnJob.client().yarnClient().getConfig(); + assertEquals(HttpFileSystem.class.getName(), hConfig.get("fs.http.impl")); + assertEquals(HttpFileSystem.class.getName(), hConfig.get("fs.https.impl")); + } + + @Test + public void testGetJobWithFsImplOverride() { + YarnJobFactory jobFactory = new YarnJobFactory(); + YarnJob yarnJob = jobFactory.getJob(new MapConfig(ImmutableMap.of( + "fs.http.impl", "org.apache.myHttp", + "fs.myscheme.impl","org.apache.myScheme"))); + Configuration hConfig = yarnJob.client().yarnClient().getConfig(); + assertEquals("org.apache.myHttp", hConfig.get("fs.http.impl")); + assertEquals("org.apache.myScheme", hConfig.get("fs.myscheme.impl")); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/e4cfeebe/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestYarnJobFactory.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestYarnJobFactory.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestYarnJobFactory.scala deleted file mode 100644 index 110c0fc..0000000 --- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestYarnJobFactory.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.job.yarn - -import org.apache.samza.config.MapConfig -import org.apache.samza.util.hadoop.HttpFileSystem -import org.junit.Assert._ -import org.junit.Test - - -class TestYarnJobFactory { - - @Test - def testGetJob { - - val jobFactory = new YarnJobFactory - - val yarnJob = jobFactory.getJob(new MapConfig) - - val hConfig = yarnJob.client.yarnClient.getConfig - - assertEquals(classOf[HttpFileSystem].getName, hConfig.get("fs.http.impl")) - - assertEquals(classOf[HttpFileSystem].getName, hConfig.get("fs.https.impl")) - - } -} -
