http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/FileSystemJobResourceUploader.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/FileSystemJobResourceUploader.cs
 
b/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/FileSystemJobResourceUploader.cs
index bcff6d2..83e5ff1 100644
--- 
a/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/FileSystemJobResourceUploader.cs
+++ 
b/lang/cs/Org.Apache.REEF.Client/YARN/RESTClient/FileSystemJobResourceUploader.cs
@@ -16,11 +16,15 @@
 // under the License.
 
 using System;
+using System.Collections.Generic;
 using System.IO;
 using Org.Apache.REEF.Client.Common;
 using Org.Apache.REEF.Client.Yarn;
+using Org.Apache.REEF.Client.YARN.RestClient.DataModel;
+using Org.Apache.REEF.Common.Files;
 using Org.Apache.REEF.IO.FileSystem;
 using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Diagnostics;
 using Org.Apache.REEF.Utilities.Logging;
 
 namespace Org.Apache.REEF.Client.YARN.RestClient
@@ -36,41 +40,72 @@ namespace Org.Apache.REEF.Client.YARN.RestClient
         private static readonly DateTime Epoch = new DateTime(1970, 1, 1, 0, 
0, 0, 0);
         private readonly IResourceArchiveFileGenerator 
_resourceArchiveFileGenerator;
         private readonly IFileSystem _fileSystem;
+        private readonly REEFFileNames _reefFileNames;
+        private readonly IFile _file;
 
         [Inject]
         private FileSystemJobResourceUploader(
             IResourceArchiveFileGenerator resourceArchiveFileGenerator,
-            IFileSystem fileSystem)
+            IFileSystem fileSystem,
+            REEFFileNames reefFileNames,
+            IFile file)
         {
             _fileSystem = fileSystem;
             _resourceArchiveFileGenerator = resourceArchiveFileGenerator;
+            _reefFileNames = reefFileNames;
+            _file = file;
         }
 
-        public JobResource UploadJobResource(string driverLocalFolderPath, 
string jobSubmissionDirectory)
+        public JobResource UploadArchiveResource(string driverLocalFolderPath, 
string remoteUploadDirectoryPath)
         {
             driverLocalFolderPath = driverLocalFolderPath.TrimEnd('\\') + @"\";
-            var driverUploadPath = jobSubmissionDirectory.TrimEnd('/') + @"/";
+            var driverUploadPath = remoteUploadDirectoryPath.TrimEnd('/') + 
@"/";
+            var parentDirectoryUri = 
_fileSystem.CreateUriForPath(remoteUploadDirectoryPath);
             Log.Log(Level.Verbose, "DriverFolderPath: {0} DriverUploadPath: 
{1}", driverLocalFolderPath, driverUploadPath);
-            var archivePath = 
_resourceArchiveFileGenerator.CreateArchiveToUpload(driverLocalFolderPath);
+            
+            _fileSystem.CreateDirectory(parentDirectoryUri);
 
-            var destinationPath = driverUploadPath + 
Path.GetFileName(archivePath);
-            var remoteFileUri = _fileSystem.CreateUriForPath(destinationPath);
-            Log.Log(Level.Verbose, @"Copy {0} to {1}", archivePath, 
remoteFileUri);
+            var archivePath = 
_resourceArchiveFileGenerator.CreateArchiveToUpload(driverLocalFolderPath);
+            return GetJobResource(archivePath, ResourceType.ARCHIVE, 
driverUploadPath);
+        }
 
+        public JobResource UploadFileResource(string fileLocalPath, string 
remoteUploadDirectoryPath)
+        {
+            var driverUploadPath = remoteUploadDirectoryPath.TrimEnd('/') + 
@"/";
             var parentDirectoryUri = 
_fileSystem.CreateUriForPath(driverUploadPath);
+
             _fileSystem.CreateDirectory(parentDirectoryUri);
-            _fileSystem.CopyFromLocal(archivePath, remoteFileUri);
+            return GetJobResource(fileLocalPath, ResourceType.FILE, 
remoteUploadDirectoryPath);
+        }
+
+        private JobResource GetJobResource(string filePath, ResourceType 
resourceType, string driverUploadPath)
+        {
+            if (!_file.Exists(filePath))
+            {
+                Exceptions.Throw(
+                    new FileNotFoundException("Could not find resource file " 
+ filePath),
+                    Log);
+            }
+
+            var destinationPath = driverUploadPath + 
Path.GetFileName(filePath);
+            var remoteFileUri = _fileSystem.CreateUriForPath(destinationPath);
+
+            Log.Log(Level.Verbose, @"Copy {0} to {1}", filePath, 
remoteFileUri);
+
+            _fileSystem.CopyFromLocal(filePath, remoteFileUri);
             var fileStatus = _fileSystem.GetFileStatus(remoteFileUri);
 
             return new JobResource
             {
+                Name = Path.GetFileNameWithoutExtension(filePath),
                 LastModificationUnixTimestamp = 
DateTimeToUnixTimestamp(fileStatus.ModificationTime),
                 RemoteUploadPath = remoteFileUri.AbsoluteUri,
-                ResourceSize = fileStatus.LengthBytes
+                ResourceSize = fileStatus.LengthBytes,
+                ResourceType = resourceType
             };
         }
 
-        private long DateTimeToUnixTimestamp(DateTime dateTime)
+        private static long DateTimeToUnixTimestamp(DateTime dateTime)
         {
             return (long)(dateTime - Epoch).TotalSeconds;
         }

http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client/YARN/WindowsYarnJobCommandProvider.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Client/YARN/WindowsYarnJobCommandProvider.cs 
b/lang/cs/Org.Apache.REEF.Client/YARN/WindowsYarnJobCommandProvider.cs
index 6a14c5c..21e3f7d 100644
--- a/lang/cs/Org.Apache.REEF.Client/YARN/WindowsYarnJobCommandProvider.cs
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/WindowsYarnJobCommandProvider.cs
@@ -91,11 +91,12 @@ namespace Org.Apache.REEF.Client.YARN
             }
 
             sb.Append(" " + LauncherClassName);
+            sb.Append(" " + _fileNames.GetJobSubmissionParametersFile());
             sb.Append(" " +
                       string.Format("{0}/{1}/{2}",
                           _fileNames.GetReefFolderName(),
                           _fileNames.GetLocalFolderName(),
-                          _fileNames.GetJobSubmissionParametersFile()));
+                          _fileNames.GetAppSubmissionParametersFile()));
             sb.Append(" " + _fileNames.GetDriverLoggingConfigCommand());
             return sb.ToString();
         }

http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client/YARN/YARNREEFClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/YARNREEFClient.cs 
b/lang/cs/Org.Apache.REEF.Client/YARN/YARNREEFClient.cs
index ce228ef..81912ea 100644
--- a/lang/cs/Org.Apache.REEF.Client/YARN/YARNREEFClient.cs
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/YARNREEFClient.cs
@@ -21,20 +21,14 @@ using System.IO;
 using System.Linq;
 using System.Threading.Tasks;
 using Org.Apache.REEF.Client.API;
-using Org.Apache.REEF.Client.Avro;
-using Org.Apache.REEF.Client.Avro.YARN;
 using Org.Apache.REEF.Client.Common;
 using Org.Apache.REEF.Client.Yarn.RestClient;
 using Org.Apache.REEF.Client.YARN;
-using Org.Apache.REEF.Client.YARN.Parameters;
 using Org.Apache.REEF.Client.YARN.RestClient.DataModel;
-using Org.Apache.REEF.Common.Avro;
 using Org.Apache.REEF.Common.Files;
-using Org.Apache.REEF.Driver.Bridge;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Tang.Implementations.Tang;
 using Org.Apache.REEF.Utilities.Logging;
-using Org.Apache.REEF.Wake.Remote.Parameters;
 
 namespace Org.Apache.REEF.Client.Yarn
 {
@@ -48,11 +42,9 @@ namespace Org.Apache.REEF.Client.Yarn
         private static readonly Logger Logger = 
Logger.GetLogger(typeof(YarnREEFClient));
         private readonly DriverFolderPreparationHelper 
_driverFolderPreparationHelper;
         private readonly IJavaClientLauncher _javaClientLauncher;
-        private readonly string _securityTokenKind;
-        private readonly string _securityTokenService;
-        private readonly string _jobSubmissionPrefix;
         private readonly REEFFileNames _fileNames;
         private readonly IYarnRMClient _yarnClient;
+        private readonly YarnREEFParamSerializer _paramSerializer;
 
         [Inject]
         internal YarnREEFClient(IJavaClientLauncher javaClientLauncher,
@@ -60,18 +52,14 @@ namespace Org.Apache.REEF.Client.Yarn
             REEFFileNames fileNames,
             YarnCommandLineEnvironment yarn,
             IYarnRMClient yarnClient,
-            [Parameter(typeof(SecurityTokenKindParameter))] string 
securityTokenKind,
-            [Parameter(typeof(SecurityTokenServiceParameter))] string 
securityTokenService,
-            [Parameter(typeof(JobSubmissionDirectoryPrefixParameter))] string 
jobSubmissionPrefix)
+            YarnREEFParamSerializer paramSerializer)
         {
-            _jobSubmissionPrefix = jobSubmissionPrefix;
-            _securityTokenKind = securityTokenKind;
-            _securityTokenService = securityTokenService;
             _javaClientLauncher = javaClientLauncher;
             _javaClientLauncher.AddToClassPath(yarn.GetYarnClasspathList());
             _driverFolderPreparationHelper = driverFolderPreparationHelper;
             _fileNames = fileNames;
             _yarnClient = yarnClient;
+            _paramSerializer = paramSerializer;
         }
 
         public void Submit(IJobSubmission jobSubmission)
@@ -123,41 +111,12 @@ namespace Org.Apache.REEF.Client.Yarn
 
             // TODO: Remove this when we have a generalized way to pass config 
to java
             var paramInjector = 
TangFactory.GetTang().NewInjector(jobSubmission.DriverConfigurations.ToArray());
-                
-            var avroJobSubmissionParameters = new AvroJobSubmissionParameters
-            {
-                jobId = jobSubmission.JobIdentifier,
-                tcpBeginPort = 
paramInjector.GetNamedInstance<TcpPortRangeStart, int>(),
-                tcpRangeCount = 
paramInjector.GetNamedInstance<TcpPortRangeCount, int>(),
-                tcpTryCount = 
paramInjector.GetNamedInstance<TcpPortRangeTryCount, int>(),
-                jobSubmissionFolder = driverFolderPath
-            };
-
-            var avroYarnJobSubmissionParameters = new 
AvroYarnJobSubmissionParameters
-            {
-                driverMemory = jobSubmission.DriverMemory,
-                driverRecoveryTimeout = 
paramInjector.GetNamedInstance<DriverBridgeConfigurationOptions.DriverRestartEvaluatorRecoverySeconds,
 int>(),
-                jobSubmissionDirectoryPrefix = _jobSubmissionPrefix,
-                sharedJobSubmissionParameters = avroJobSubmissionParameters
-            };
-
-            var avroYarnClusterJobSubmissionParameters = new 
AvroYarnClusterJobSubmissionParameters
-            {
-                maxApplicationSubmissions = 
paramInjector.GetNamedInstance<DriverBridgeConfigurationOptions.MaxApplicationSubmissions,
 int>(),
-                securityTokenKind = _securityTokenKind,
-                securityTokenService = _securityTokenService,
-                yarnJobSubmissionParameters = avroYarnJobSubmissionParameters
-            };
-
-            var submissionArgsFilePath = Path.Combine(driverFolderPath, 
_fileNames.GetJobSubmissionParametersFile());
-            using (var argsFileStream = new FileStream(submissionArgsFilePath, 
FileMode.CreateNew))
-            {
-                var serializedArgs = 
AvroJsonSerializer<AvroYarnClusterJobSubmissionParameters>.ToBytes(avroYarnClusterJobSubmissionParameters);
-                argsFileStream.Write(serializedArgs, 0, serializedArgs.Length);
-            }
+
+            var submissionJobArgsFilePath = 
_paramSerializer.SerializeJobFile(jobSubmission, driverFolderPath);
+            var submissionAppArgsFilePath = 
_paramSerializer.SerializeAppFile(jobSubmission, paramInjector, 
driverFolderPath);
 
             // Submit the driver
-            _javaClientLauncher.Launch(JavaClassName, submissionArgsFilePath);
+            _javaClientLauncher.Launch(JavaClassName, 
submissionJobArgsFilePath, submissionAppArgsFilePath);
             Logger.Log(Level.Info, "Submitted the Driver for execution." + 
jobSubmission.JobIdentifier);
         }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetClient.cs 
b/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetClient.cs
index 89dad2b..33ab5b9 100644
--- a/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetClient.cs
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetClient.cs
@@ -16,24 +16,21 @@
 // under the License.
 
 using System;
+using System.Collections.Generic;
 using System.IO;
 using System.Linq;
 using System.Threading.Tasks;
 using Org.Apache.REEF.Client.API;
-using Org.Apache.REEF.Client.Avro;
-using Org.Apache.REEF.Client.Avro.YARN;
 using Org.Apache.REEF.Client.Common;
 using Org.Apache.REEF.Client.Yarn;
 using Org.Apache.REEF.Client.Yarn.RestClient;
 using Org.Apache.REEF.Client.YARN.RestClient.DataModel;
-using Org.Apache.REEF.Common.Avro;
 using Org.Apache.REEF.Common.Files;
 using Org.Apache.REEF.Driver.Bridge;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Tang.Implementations.Tang;
 using Org.Apache.REEF.Utilities.Attributes;
 using Org.Apache.REEF.Utilities.Logging;
-using Org.Apache.REEF.Wake.Remote.Parameters;
 
 namespace Org.Apache.REEF.Client.YARN
 {
@@ -53,6 +50,7 @@ namespace Org.Apache.REEF.Client.YARN
         private readonly IYarnJobCommandProvider _yarnJobCommandProvider;
         private readonly REEFFileNames _fileNames;
         private readonly IJobSubmissionDirectoryProvider 
_jobSubmissionDirectoryProvider;
+        private readonly YarnREEFDotNetParamSerializer _paramSerializer;
 
         [Inject]
         private YarnREEFDotNetClient(
@@ -61,7 +59,8 @@ namespace Org.Apache.REEF.Client.YARN
             IJobResourceUploader jobResourceUploader,
             IYarnJobCommandProvider yarnJobCommandProvider,
             REEFFileNames fileNames,
-            IJobSubmissionDirectoryProvider jobSubmissionDirectoryProvider)
+            IJobSubmissionDirectoryProvider jobSubmissionDirectoryProvider,
+            YarnREEFDotNetParamSerializer paramSerializer)
         {
             _jobSubmissionDirectoryProvider = jobSubmissionDirectoryProvider;
             _fileNames = fileNames;
@@ -69,6 +68,7 @@ namespace Org.Apache.REEF.Client.YARN
             _jobResourceUploader = jobResourceUploader;
             _driverFolderPreparationHelper = driverFolderPreparationHelper;
             _yarnRMClient = yarnRMClient;
+            _paramSerializer = paramSerializer;
         }
 
         public void Submit(IJobSubmission jobSubmission)
@@ -94,40 +94,21 @@ namespace Org.Apache.REEF.Client.YARN
 
                 // prepare configuration
                 var paramInjector = 
TangFactory.GetTang().NewInjector(jobSubmission.DriverConfigurations.ToArray());
-                int maxApplicationSubmissions =
+                var maxApplicationSubmissions = 
                     
paramInjector.GetNamedInstance<DriverBridgeConfigurationOptions.MaxApplicationSubmissions,
 int>();
 
-                var avroJobSubmissionParameters = new 
AvroJobSubmissionParameters
-                {
-                    jobId = jobSubmission.JobIdentifier,
-                    tcpBeginPort = 
paramInjector.GetNamedInstance<TcpPortRangeStart, int>(),
-                    tcpRangeCount = 
paramInjector.GetNamedInstance<TcpPortRangeCount, int>(),
-                    tcpTryCount = 
paramInjector.GetNamedInstance<TcpPortRangeTryCount, int>(),
-                    jobSubmissionFolder = localDriverFolderPath
-                };
-
-                var avroYarnJobSubmissionParameters = new 
AvroYarnJobSubmissionParameters
-                {
-                    driverMemory = jobSubmission.DriverMemory,
-                    driverRecoveryTimeout =
-                        
paramInjector.GetNamedInstance<DriverBridgeConfigurationOptions.DriverRestartEvaluatorRecoverySeconds,
 int>(),
-                    jobSubmissionDirectoryPrefix = jobSubmissionDirectory,
-                    dfsJobSubmissionFolder = jobSubmissionDirectory,
-                    sharedJobSubmissionParameters = avroJobSubmissionParameters
-                };
-
-                var submissionArgsFilePath = 
Path.Combine(localDriverFolderPath,
-                    _fileNames.GetLocalFolderPath(),
-                    _fileNames.GetJobSubmissionParametersFile());
-                using (var argsFileStream = new 
FileStream(submissionArgsFilePath, FileMode.CreateNew))
-                {
-                    var serializedArgs =
-                        
AvroJsonSerializer<AvroYarnJobSubmissionParameters>.ToBytes(avroYarnJobSubmissionParameters);
-                    argsFileStream.Write(serializedArgs, 0, 
serializedArgs.Length);
-                }
+                _paramSerializer.SerializeAppFile(jobSubmission, 
paramInjector, localDriverFolderPath);
+                _paramSerializer.SerializeJobFile(jobSubmission, 
localDriverFolderPath, jobSubmissionDirectory);
+
+                var archiveResource = 
_jobResourceUploader.UploadArchiveResource(localDriverFolderPath, 
jobSubmissionDirectory);
+
+                // Path to the job args file.
+                var jobArgsFilePath = Path.Combine(localDriverFolderPath, 
_fileNames.GetJobSubmissionParametersFile());
+
+                var argFileResource = 
_jobResourceUploader.UploadFileResource(jobArgsFilePath, 
jobSubmissionDirectory);
 
                 // upload prepared folder to DFS
-                var jobResource = 
_jobResourceUploader.UploadJobResource(localDriverFolderPath, 
jobSubmissionDirectory);
+                var jobResources = new List<JobResource> { archiveResource, 
argFileResource };
 
                 // submit job
                 Log.Log(Level.Verbose, @"Assigned application id {0}", 
applicationId);
@@ -135,7 +116,7 @@ namespace Org.Apache.REEF.Client.YARN
                 var submissionReq = 
CreateApplicationSubmissionRequest(jobSubmission,
                     applicationId,
                     maxApplicationSubmissions,
-                    jobResource);
+                    jobResources);
                 var submittedApplication = 
_yarnRMClient.SubmitApplicationAsync(submissionReq).GetAwaiter().GetResult();
                 Log.Log(Level.Info, @"Submitted application {0}", 
submittedApplication.Id);
             }
@@ -163,13 +144,16 @@ namespace Org.Apache.REEF.Client.YARN
            IJobSubmission jobSubmission,
            string appId,
            int maxApplicationSubmissions,
-           JobResource jobResource)
+           IReadOnlyCollection<JobResource> jobResources)
         {
             string command = _yarnJobCommandProvider.GetJobSubmissionCommand();
             Log.Log(Level.Verbose, "Command for YARN: {0}", command);
             Log.Log(Level.Verbose, "ApplicationID: {0}", appId);
             Log.Log(Level.Verbose, "MaxApplicationSubmissions: {0}", 
maxApplicationSubmissions);
-            Log.Log(Level.Verbose, "Driver archive location: {0}", 
jobResource.RemoteUploadPath);
+            foreach (var jobResource in jobResources)
+            {
+                Log.Log(Level.Verbose, "Remote file: {0}", 
jobResource.RemoteUploadPath);
+            }
 
             var submitApplication = new SubmitApplication
             {
@@ -188,24 +172,7 @@ namespace Org.Apache.REEF.Client.YARN
                 UnmanagedAM = false,
                 AmContainerSpec = new AmContainerSpec
                 {
-                    LocalResources = new LocalResources
-                    {
-                        Entries = new[]
-                        {
-                            new KeyValuePair<string, LocalResourcesValue>
-                            {
-                                Key = _fileNames.GetReefFolderName(),
-                                Value = new LocalResourcesValue
-                                {
-                                    Resource = jobResource.RemoteUploadPath,
-                                    Type = ResourceType.ARCHIVE,
-                                    Visibility = Visibility.APPLICATION,
-                                    Size = jobResource.ResourceSize,
-                                    Timestamp = 
jobResource.LastModificationUnixTimestamp
-                                }
-                            }
-                        }
-                    },
+                    LocalResources = CreateLocalResources(jobResources),
                     Commands = new Commands
                     {
                         Command = command
@@ -216,11 +183,30 @@ namespace Org.Apache.REEF.Client.YARN
             return submitApplication;
         }
 
+        private static LocalResources 
CreateLocalResources(IEnumerable<JobResource> jobResources)
+        {
+            return new LocalResources
+            {
+                Entries = jobResources.Select(jobResource => new 
RestClient.DataModel.KeyValuePair<string, LocalResourcesValue>
+                {
+                    Key = jobResource.Name,
+                    Value = new LocalResourcesValue
+                    {
+                        Resource = jobResource.RemoteUploadPath,
+                        Type = jobResource.ResourceType,
+                        Visibility = Visibility.APPLICATION,
+                        Size = jobResource.ResourceSize,
+                        Timestamp = jobResource.LastModificationUnixTimestamp
+                    }
+                }).ToArray()
+            };
+        }
+
         /// <summary>
         /// Creates the temporary directory to hold the job submission.
         /// </summary>
         /// <returns>The path to the folder created.</returns>
-        private string CreateDriverFolder(string jobId, string appId)
+        private static string CreateDriverFolder(string jobId, string appId)
         {
             return Path.GetFullPath(Path.Combine(Path.GetTempPath(), 
string.Join("-", "reef", jobId, appId)));
         }

http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetParamSerializer.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetParamSerializer.cs 
b/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetParamSerializer.cs
new file mode 100644
index 0000000..dd28702
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFDotNetParamSerializer.cs
@@ -0,0 +1,114 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.IO;
+using Org.Apache.REEF.Client.API;
+using Org.Apache.REEF.Client.Avro;
+using Org.Apache.REEF.Client.Avro.YARN;
+using Org.Apache.REEF.Common.Avro;
+using Org.Apache.REEF.Common.Files;
+using Org.Apache.REEF.Driver.Bridge;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Wake.Remote.Parameters;
+
+namespace Org.Apache.REEF.Client.YARN
+{
+    /// <summary>
+    /// Job/application parameter file serializer for <see 
cref="YarnREEFDotNetClient"/>.
+    /// </summary>
+    internal sealed class YarnREEFDotNetParamSerializer
+    {
+        private readonly REEFFileNames _fileNames;
+
+        [Inject]
+        private YarnREEFDotNetParamSerializer(REEFFileNames fileNames)
+        {
+            _fileNames = fileNames;
+        }
+
+        /// <summary>
+        /// Serializes the application parameters to 
reef/local/app-submission-params.json.
+        /// </summary>
+        internal void SerializeAppFile(IJobSubmission jobSubmission, IInjector 
paramInjector, string localDriverFolderPath)
+        {
+            var serializedArgs = SerializeAppArgsToBytes(jobSubmission, 
paramInjector, localDriverFolderPath);
+
+            var submissionAppArgsFilePath = Path.Combine(
+                localDriverFolderPath, _fileNames.GetLocalFolderPath(), 
_fileNames.GetAppSubmissionParametersFile());
+
+            using (var jobArgsFileStream = new 
FileStream(submissionAppArgsFilePath, FileMode.CreateNew))
+            {
+                jobArgsFileStream.Write(serializedArgs, 0, 
serializedArgs.Length);
+            }
+        }
+
+        internal byte[] SerializeAppArgsToBytes(IJobSubmission jobSubmission, 
IInjector paramInjector, string localDriverFolderPath)
+        {
+            var avroAppSubmissionParameters = new AvroAppSubmissionParameters
+            {
+                tcpBeginPort = 
paramInjector.GetNamedInstance<TcpPortRangeStart, int>(),
+                tcpRangeCount = 
paramInjector.GetNamedInstance<TcpPortRangeCount, int>(),
+                tcpTryCount = 
paramInjector.GetNamedInstance<TcpPortRangeTryCount, int>()
+            };
+
+            var avroYarnAppSubmissionParameters = new 
AvroYarnAppSubmissionParameters
+            {
+                sharedAppSubmissionParameters = avroAppSubmissionParameters,
+                driverMemory = jobSubmission.DriverMemory,
+                driverRecoveryTimeout =
+                    
paramInjector.GetNamedInstance<DriverBridgeConfigurationOptions.DriverRestartEvaluatorRecoverySeconds,
 int>(),
+            };
+
+            return 
AvroJsonSerializer<AvroYarnAppSubmissionParameters>.ToBytes(avroYarnAppSubmissionParameters);
+        }
+
+        /// <summary>
+        /// Serializes the job parameters to job-submission-params.json.
+        /// </summary>
+        internal void SerializeJobFile(IJobSubmission jobSubmission, string 
localDriverFolderPath, string jobSubmissionDirectory)
+        {
+            var serializedArgs = SerializeJobArgsToBytes(jobSubmission, 
localDriverFolderPath, jobSubmissionDirectory);
+
+            var submissionJobArgsFilePath = Path.Combine(localDriverFolderPath,
+                _fileNames.GetJobSubmissionParametersFile());
+
+            using (var jobArgsFileStream = new 
FileStream(submissionJobArgsFilePath, FileMode.CreateNew))
+            {
+                jobArgsFileStream.Write(serializedArgs, 0, 
serializedArgs.Length);
+            }
+        }
+
+        internal byte[] SerializeJobArgsToBytes(IJobSubmission jobSubmission, 
string localDriverFolderPath, string jobSubmissionDirectory)
+        {
+            var avroJobSubmissionParameters = new AvroJobSubmissionParameters
+            {
+                jobId = jobSubmission.JobIdentifier,
+                jobSubmissionFolder = localDriverFolderPath
+            };
+
+            var avroYarnJobSubmissionParameters = new 
AvroYarnJobSubmissionParameters
+            {
+                jobSubmissionDirectoryPrefix = jobSubmissionDirectory,
+                dfsJobSubmissionFolder = jobSubmissionDirectory,
+                sharedJobSubmissionParameters = avroJobSubmissionParameters
+            };
+
+            return 
AvroJsonSerializer<AvroYarnJobSubmissionParameters>.ToBytes(avroYarnJobSubmissionParameters);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFParamSerializer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFParamSerializer.cs 
b/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFParamSerializer.cs
new file mode 100644
index 0000000..e2278e4
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client/YARN/YarnREEFParamSerializer.cs
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.IO;
+using Org.Apache.REEF.Client.API;
+using Org.Apache.REEF.Client.Avro;
+using Org.Apache.REEF.Client.Avro.YARN;
+using Org.Apache.REEF.Client.Yarn;
+using Org.Apache.REEF.Client.YARN.Parameters;
+using Org.Apache.REEF.Common.Avro;
+using Org.Apache.REEF.Common.Files;
+using Org.Apache.REEF.Driver.Bridge;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Wake.Remote.Parameters;
+
+namespace Org.Apache.REEF.Client.YARN
+{
+    /// <summary>
+    /// Job/application parameter file serializer for <see 
cref="YarnREEFClient"/>.
+    /// </summary>
+    internal sealed class YarnREEFParamSerializer
+    {
+        private readonly REEFFileNames _fileNames;
+        private readonly string _securityTokenKind;
+        private readonly string _securityTokenService;
+        private readonly string _jobSubmissionPrefix;
+
+        [Inject]
+        private YarnREEFParamSerializer(
+            REEFFileNames fileNames,
+            [Parameter(typeof(SecurityTokenKindParameter))] string 
securityTokenKind,
+            [Parameter(typeof(SecurityTokenServiceParameter))] string 
securityTokenService,
+            [Parameter(typeof(JobSubmissionDirectoryPrefixParameter))] string 
jobSubmissionPrefix)
+        {
+            _fileNames = fileNames;
+            _jobSubmissionPrefix = jobSubmissionPrefix;
+            _securityTokenKind = securityTokenKind;
+            _securityTokenService = securityTokenService;
+        }
+
+        /// <summary>
+        /// Serializes the application parameters to 
reef/local/app-submission-params.json.
+        /// </summary>
+        internal string SerializeAppFile(IJobSubmission jobSubmission, 
IInjector paramInjector, string driverFolderPath)
+        {
+            var serializedArgs = SerializeAppArgsToBytes(jobSubmission, 
paramInjector);
+
+            var submissionArgsFilePath = Path.Combine(driverFolderPath, 
_fileNames.GetAppSubmissionParametersFile());
+            using (var argsFileStream = new FileStream(submissionArgsFilePath, 
FileMode.CreateNew))
+            {
+                argsFileStream.Write(serializedArgs, 0, serializedArgs.Length);
+            }
+
+            return submissionArgsFilePath;
+        }
+
+        internal byte[] SerializeAppArgsToBytes(IJobSubmission jobSubmission, 
IInjector paramInjector)
+        {
+            var avroAppSubmissionParameters = new AvroAppSubmissionParameters
+            {
+                tcpBeginPort = 
paramInjector.GetNamedInstance<TcpPortRangeStart, int>(),
+                tcpRangeCount = 
paramInjector.GetNamedInstance<TcpPortRangeCount, int>(),
+                tcpTryCount = 
paramInjector.GetNamedInstance<TcpPortRangeTryCount, int>()
+            };
+
+            var avroYarnAppSubmissionParameters = new 
AvroYarnAppSubmissionParameters
+            {
+                sharedAppSubmissionParameters = avroAppSubmissionParameters,
+                driverMemory = jobSubmission.DriverMemory,
+                driverRecoveryTimeout = 
paramInjector.GetNamedInstance<DriverBridgeConfigurationOptions.DriverRestartEvaluatorRecoverySeconds,
 int>()
+            };
+
+            var avroYarnClusterAppSubmissionParameters = new 
AvroYarnClusterAppSubmissionParameters
+            {
+                yarnAppSubmissionParameters = avroYarnAppSubmissionParameters,
+                maxApplicationSubmissions = 
paramInjector.GetNamedInstance<DriverBridgeConfigurationOptions.MaxApplicationSubmissions,
 int>()
+            };
+
+            return 
AvroJsonSerializer<AvroYarnClusterAppSubmissionParameters>.ToBytes(avroYarnClusterAppSubmissionParameters);
+        }
+
+        /// <summary>
+        /// Serializes the job parameters to job-submission-params.json.
+        /// </summary>
+        internal string SerializeJobFile(IJobSubmission jobSubmission, string 
driverFolderPath)
+        {
+            var serializedArgs = SerializeJobArgsToBytes(jobSubmission, 
driverFolderPath);
+
+            var submissionArgsFilePath = Path.Combine(driverFolderPath, 
_fileNames.GetJobSubmissionParametersFile());
+            using (var argsFileStream = new FileStream(submissionArgsFilePath, 
FileMode.CreateNew))
+            {
+                argsFileStream.Write(serializedArgs, 0, serializedArgs.Length);
+            }
+
+            return submissionArgsFilePath;
+        }
+
+        internal byte[] SerializeJobArgsToBytes(IJobSubmission jobSubmission, 
string driverFolderPath)
+        {
+            var avroJobSubmissionParameters = new AvroJobSubmissionParameters
+            {
+                jobId = jobSubmission.JobIdentifier,
+                jobSubmissionFolder = driverFolderPath
+            };
+
+            var avroYarnJobSubmissionParameters = new 
AvroYarnJobSubmissionParameters
+            {
+                jobSubmissionDirectoryPrefix = _jobSubmissionPrefix,
+                sharedJobSubmissionParameters = avroJobSubmissionParameters
+            };
+
+            var avroYarnClusterJobSubmissionParameters = new 
AvroYarnClusterJobSubmissionParameters
+            {
+                securityTokenKind = _securityTokenKind,
+                securityTokenService = _securityTokenService,
+                yarnJobSubmissionParameters = avroYarnJobSubmissionParameters
+            };
+
+            return 
AvroJsonSerializer<AvroYarnClusterJobSubmissionParameters>.ToBytes(avroYarnClusterJobSubmissionParameters);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/cs/Org.Apache.REEF.Common/Files/REEFFileNames.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Files/REEFFileNames.cs 
b/lang/cs/Org.Apache.REEF.Common/Files/REEFFileNames.cs
index 321342b..94e2e6b 100644
--- a/lang/cs/Org.Apache.REEF.Common/Files/REEFFileNames.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Files/REEFFileNames.cs
@@ -53,6 +53,7 @@ namespace Org.Apache.REEF.Common.Files
         private const string BRIDGE_EXE_CONFIG_NAME = 
"Org.Apache.REEF.Bridge.exe.config";
         private const string SECURITY_TOKEN_IDENTIFIER_FILE = 
"SecurityTokenId";
         private const string SECURITY_TOKEN_PASSWORD_FILE = "SecurityTokenPwd";
+        private const string APP_SUBMISSION_PARAMETERS_FILE = 
"app-submission-params.json";
         private const string JOB_SUBMISSION_PARAMETERS_FILE = 
"job-submission-params.json";
         private const string DRIVER_COMMAND_LOGGING_CONFIG = "1> 
<LOG_DIR>/driver.stdout 2> <LOG_DIR>/driver.stderr";
 
@@ -243,7 +244,16 @@ namespace Org.Apache.REEF.Common.Files
         }
 
         /// <summary>
-        /// The Job Submission parameters file that is used to submit a job 
through Java,
+        /// The Job Submission application parameters file that is used to 
submit a job through Java,
+        /// either directly or via a "bootstrap" method.
+        /// </summary>
+        public string GetAppSubmissionParametersFile()
+        {
+            return APP_SUBMISSION_PARAMETERS_FILE;
+        }
+
+        /// <summary>
+        /// The Job Submission job parameters file that is used to submit a 
job through Java,
         /// either directly or via a "bootstrap" method.
         /// </summary>
         public string GetJobSubmissionParametersFile()

http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-bridge-client/src/main/avro/AppSubmissionParameters.avsc
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-bridge-client/src/main/avro/AppSubmissionParameters.avsc 
b/lang/java/reef-bridge-client/src/main/avro/AppSubmissionParameters.avsc
new file mode 100644
index 0000000..30205e1
--- /dev/null
+++ b/lang/java/reef-bridge-client/src/main/avro/AppSubmissionParameters.avsc
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+[
+  {
+      "namespace": "org.apache.reef.reef.bridge.client.avro",
+      "type": "record",
+      "name": "AvroAppSubmissionParameters",
+      "doc": "General cross-language application submission parameters shared 
by all runtimes",
+      "fields": [
+        { "name": "tcpBeginPort", "type": "int" },
+        { "name": "tcpRangeCount", "type": "int" },
+        { "name": "tcpTryCount", "type": "int" }
+      ]
+  },
+  {
+      "namespace": "org.apache.reef.reef.bridge.client.avro",
+      "type": "record",
+      "name": "AvroLocalAppSubmissionParameters",
+      "doc": "Cross-language application submission parameters to the Local 
runtime",
+      "fields": [
+        { "name": "sharedAppSubmissionParameters", "type": 
"AvroAppSubmissionParameters" },
+        { "name": "maxNumberOfConcurrentEvaluators", "type": "int" }
+      ]
+  },
+  {
+      "namespace": "org.apache.reef.reef.bridge.client.avro",
+      "type": "record",
+      "name": "AvroYarnAppSubmissionParameters",
+      "doc": "General cross-language application submission parameters to the 
YARN runtime",
+      "fields": [
+        { "name": "sharedAppSubmissionParameters", "type": 
"AvroAppSubmissionParameters" },
+        { "name": "driverMemory", "type": "int" },
+        { "name": "driverRecoveryTimeout", "type": "int" }
+      ]
+  },
+  {
+      "namespace": "org.apache.reef.reef.bridge.client.avro",
+      "type": "record",
+      "name": "AvroYarnClusterAppSubmissionParameters",
+      "doc": "Cross-language application submission parameters to the YARN 
runtime using Hadoop's submission client",
+      "fields": [
+        { "name": "yarnAppSubmissionParameters", "type": 
"AvroYarnAppSubmissionParameters" },
+        { "name": "maxApplicationSubmissions", "type": "int" }
+      ]
+  }
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc 
b/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc
index fec2431..61b9812 100644
--- a/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc
+++ b/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc
@@ -21,34 +21,19 @@
       "namespace": "org.apache.reef.reef.bridge.client.avro",
       "type": "record",
       "name": "AvroJobSubmissionParameters",
-      "doc": "General cross-language submission parameters shared by all 
runtimes",
+      "doc": "General cross-language job submission parameters shared by all 
runtimes",
       "fields": [
         { "name": "jobId", "type": "string" },
-        { "name": "tcpBeginPort", "type": "int" },
-        { "name": "tcpRangeCount", "type": "int" },
-        { "name": "tcpTryCount", "type": "int" },
         { "name": "jobSubmissionFolder", "type": "string" }
       ]
   },
   {
       "namespace": "org.apache.reef.reef.bridge.client.avro",
       "type": "record",
-      "name": "AvroLocalJobSubmissionParameters",
-      "doc": "Cross-language submission parameters to the Local runtime",
-      "fields": [
-        { "name": "sharedJobSubmissionParameters", "type": 
"AvroJobSubmissionParameters" },
-        { "name": "maxNumberOfConcurrentEvaluators", "type": "int" }
-      ]
-  },
-  {
-      "namespace": "org.apache.reef.reef.bridge.client.avro",
-      "type": "record",
       "name": "AvroYarnJobSubmissionParameters",
       "doc": "General cross-language submission parameters to the YARN 
runtime",
       "fields": [
         { "name": "sharedJobSubmissionParameters", "type": 
"AvroJobSubmissionParameters" },
-        { "name": "driverMemory", "type": "int" },
-        { "name": "driverRecoveryTimeout", "type": "int" },
         { "name": "dfsJobSubmissionFolder", "type": "string", "default": 
"NULL" },
         { "name": "jobSubmissionDirectoryPrefix", "type": "string" }
       ]
@@ -60,7 +45,6 @@
       "doc": "Cross-language submission parameters to the YARN runtime using 
Hadoop's submission client",
       "fields": [
         { "name": "yarnJobSubmissionParameters", "type": 
"AvroYarnJobSubmissionParameters" },
-        { "name": "maxApplicationSubmissions", "type": "int" },
         { "name": "securityTokenKind", "type": "string", "default": "NULL" },
         { "name": "securityTokenService", "type": "string", "default": "NULL" }
       ]

http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/JobResourceUploader.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/JobResourceUploader.java
 
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/JobResourceUploader.java
index 6ad77c7..e03013a 100644
--- 
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/JobResourceUploader.java
+++ 
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/JobResourceUploader.java
@@ -20,6 +20,7 @@ package org.apache.reef.bridge.client;
 
 import org.apache.commons.lang.Validate;
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.reef.runtime.common.files.RuntimeClasspathProvider;
 import org.apache.reef.runtime.yarn.YarnClasspathProvider;
@@ -44,16 +45,18 @@ public final class JobResourceUploader {
   /**
    * This class is invoked from 
Org.Apache.REEF.Client.Yarn.LegacyJobResourceUploader in .NET code.
    * Arguments:
-   * [0] : Local path for already generated archive
-   * [1] : Path of job submission directory
-   * [2] : File path for output with details of uploaded resource
+   * [0] : Local path for file.
+   * [1] : Type for file.
+   * [2] : Path of job submission directory
+   * [3] : File path for output with details of uploaded resource
    */
   public static void main(final String[] args) throws InjectionException, 
IOException {
-    Validate.isTrue(args.length == 3, "Job resource uploader requires 3 args");
+    Validate.isTrue(args.length == 4, "Job resource uploader requires 4 args");
     final File localFile = new File(args[0]);
-    Validate.isTrue(localFile.exists(), "Local archive does not exist " + 
localFile.getAbsolutePath());
-    final String jobSubmissionDirectory = args[1];
-    final String localOutputPath = args[2];
+    Validate.isTrue(localFile.exists(), "Local file does not exist " + 
localFile.getAbsolutePath());
+    final String fileType = args[1];
+    final String jobSubmissionDirectory = args[2];
+    final String localOutputPath = args[3];
 
     LOG.log(Level.INFO, "Received args: LocalPath " + 
localFile.getAbsolutePath() + " Submission directory " +
         jobSubmissionDirectory + " LocalOutputPath " + localOutputPath);
@@ -66,7 +69,7 @@ public final class JobResourceUploader {
         .newInjector(configuration)
         .getInstance(JobUploader.class);
     final LocalResource localResource = 
jobUploader.createJobFolder(jobSubmissionDirectory)
-        .uploadAsLocalResource(localFile);
+        .uploadAsLocalResource(localFile, LocalResourceType.valueOf(fileType));
 
     // Output: <UploadedPath>;<LastModificationUnixTimestamp>;<ResourceSize>
     final URL resource = localResource.getResource();

http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java
 
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java
index cc308ab..3c94acc 100644
--- 
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java
+++ 
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java
@@ -61,12 +61,19 @@ public final class LocalClient {
 
   public static void main(final String[] args) throws IOException, 
InjectionException {
     final File jobSubmissionParametersFile = new File(args[0]);
+    final File localAppSubmissionParametersFile = new File(args[1]);
+
     if (!(jobSubmissionParametersFile.exists() && 
jobSubmissionParametersFile.canRead())) {
       throw new IOException("Unable to open and read " + 
jobSubmissionParametersFile.getAbsolutePath());
     }
 
+    if (!(localAppSubmissionParametersFile.exists() && 
localAppSubmissionParametersFile.canRead())) {
+      throw new IOException("Unable to open and read " + 
localAppSubmissionParametersFile.getAbsolutePath());
+    }
+
     final LocalSubmissionFromCS localSubmissionFromCS =
-        
LocalSubmissionFromCS.fromJobSubmissionParametersFile(jobSubmissionParametersFile);
+        LocalSubmissionFromCS.fromSubmissionParameterFiles(
+            jobSubmissionParametersFile, localAppSubmissionParametersFile);
     LOG.log(Level.INFO, "Local job submission received from C#: {0}", 
localSubmissionFromCS);
     final Configuration runtimeConfiguration = 
localSubmissionFromCS.getRuntimeConfiguration();
 

http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalRuntimeDriverConfigurationGenerator.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalRuntimeDriverConfigurationGenerator.java
 
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalRuntimeDriverConfigurationGenerator.java
index 93a8f2c..b63e20b 100644
--- 
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalRuntimeDriverConfigurationGenerator.java
+++ 
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalRuntimeDriverConfigurationGenerator.java
@@ -92,13 +92,20 @@ final class LocalRuntimeDriverConfigurationGenerator {
   }
 
   public static void main(final String[] args) throws InjectionException, 
IOException {
-    final File jobSubmissionParametersFile = new File(args[0]);
+    final File localAppSubmissionParametersFile = new File(args[0]);
+    final File jobSubmissionParametersFile = new File(args[1]);
+
     if (!(jobSubmissionParametersFile.exists() && 
jobSubmissionParametersFile.canRead())) {
       throw new IOException("Unable to open and read " + 
jobSubmissionParametersFile.getAbsolutePath());
     }
 
+    if (!(localAppSubmissionParametersFile.exists() && 
localAppSubmissionParametersFile.canRead())) {
+      throw new IOException("Unable to open and read " + 
localAppSubmissionParametersFile.getAbsolutePath());
+    }
+
     final LocalSubmissionFromCS localSubmission =
-        
LocalSubmissionFromCS.fromJobSubmissionParametersFile(jobSubmissionParametersFile);
+        LocalSubmissionFromCS.fromSubmissionParameterFiles(
+            jobSubmissionParametersFile, localAppSubmissionParametersFile);
 
     LOG.log(Level.FINE, "Local driver config generation received from C#: 
{0}", localSubmission);
     final Configuration localRuntimeConfiguration = 
localSubmission.getRuntimeConfiguration();

http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalSubmissionFromCS.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalSubmissionFromCS.java
 
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalSubmissionFromCS.java
index e8d8118..cc5bb82 100644
--- 
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalSubmissionFromCS.java
+++ 
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalSubmissionFromCS.java
@@ -24,8 +24,9 @@ import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.commons.lang.Validate;
 import org.apache.reef.client.parameters.DriverConfigurationProviders;
 import org.apache.reef.io.TcpPortConfigurationProvider;
+import org.apache.reef.reef.bridge.client.avro.AvroAppSubmissionParameters;
 import org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters;
-import 
org.apache.reef.reef.bridge.client.avro.AvroLocalJobSubmissionParameters;
+import 
org.apache.reef.reef.bridge.client.avro.AvroLocalAppSubmissionParameters;
 import org.apache.reef.runtime.common.files.REEFFileNames;
 import 
org.apache.reef.runtime.common.launch.parameters.DriverLaunchCommandPrefix;
 import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration;
@@ -58,16 +59,18 @@ final class LocalSubmissionFromCS {
   private final int tcpRangeCount;
   private final int tcpTryCount;
 
-  private LocalSubmissionFromCS(final AvroLocalJobSubmissionParameters 
avroLocalJobSubmissionParameters) {
+  private LocalSubmissionFromCS(final AvroJobSubmissionParameters 
avroJobSubmissionParameters,
+                                final AvroLocalAppSubmissionParameters 
avroLocalAppSubmissionParameters) {
     // We assume the given path to be the one of the driver. The job folder is 
one level up from there.
-    final AvroJobSubmissionParameters jobSubmissionParameters =
-        avroLocalJobSubmissionParameters.getSharedJobSubmissionParameters();
-    this.driverFolder = new 
File(jobSubmissionParameters.getJobSubmissionFolder().toString());
-    this.jobId = jobSubmissionParameters.getJobId().toString();
-    this.maxNumberOfConcurrentEvaluators = 
avroLocalJobSubmissionParameters.getMaxNumberOfConcurrentEvaluators();
-    this.tcpBeginPort = jobSubmissionParameters.getTcpBeginPort();
-    this.tcpRangeCount = jobSubmissionParameters.getTcpRangeCount();
-    this.tcpTryCount = jobSubmissionParameters.getTcpTryCount();
+    final AvroAppSubmissionParameters appSubmissionParameters =
+        avroLocalAppSubmissionParameters.getSharedAppSubmissionParameters();
+    this.tcpBeginPort = appSubmissionParameters.getTcpBeginPort();
+    this.tcpRangeCount = appSubmissionParameters.getTcpRangeCount();
+    this.tcpTryCount = appSubmissionParameters.getTcpTryCount();
+    this.maxNumberOfConcurrentEvaluators = 
avroLocalAppSubmissionParameters.getMaxNumberOfConcurrentEvaluators();
+
+    this.driverFolder = new 
File(avroJobSubmissionParameters.getJobSubmissionFolder().toString());
+    this.jobId = avroJobSubmissionParameters.getJobId().toString();
     this.jobFolder = driverFolder.getParentFile();
     this.runtimeRootFolder = jobFolder.getParentFile();
 
@@ -137,16 +140,29 @@ final class LocalSubmissionFromCS {
   /**
    * Takes the local job submission configuration file, deserializes it, and 
creates submission object.
    */
-  static LocalSubmissionFromCS fromJobSubmissionParametersFile(final File 
localJobSubmissionParametersFile)
+  static LocalSubmissionFromCS fromSubmissionParameterFiles(final File 
jobSubmissionParametersFile,
+                                                            final File 
localAppSubmissionParametersFile)
       throws IOException {
-    try (final FileInputStream fileInputStream = new 
FileInputStream(localJobSubmissionParametersFile)) {
+    final AvroLocalAppSubmissionParameters localAppSubmissionParameters;
+
+    final AvroJobSubmissionParameters jobSubmissionParameters;
+
+    try (final FileInputStream fileInputStream = new 
FileInputStream(jobSubmissionParametersFile)) {
       final JsonDecoder decoder = DecoderFactory.get().jsonDecoder(
-          AvroLocalJobSubmissionParameters.getClassSchema(), fileInputStream);
-      final SpecificDatumReader<AvroLocalJobSubmissionParameters> reader =
-          new SpecificDatumReader<>(AvroLocalJobSubmissionParameters.class);
-      final AvroLocalJobSubmissionParameters localJobSubmissionParameters = 
reader.read(null, decoder);
+          AvroJobSubmissionParameters.getClassSchema(), fileInputStream);
+      final SpecificDatumReader<AvroJobSubmissionParameters> reader =
+          new SpecificDatumReader<>(AvroJobSubmissionParameters.class);
+      jobSubmissionParameters = reader.read(null, decoder);
+    }
 
-      return new LocalSubmissionFromCS(localJobSubmissionParameters);
+    try (final FileInputStream fileInputStream = new 
FileInputStream(localAppSubmissionParametersFile)) {
+      final JsonDecoder decoder = DecoderFactory.get().jsonDecoder(
+          AvroLocalAppSubmissionParameters.getClassSchema(), fileInputStream);
+      final SpecificDatumReader<AvroLocalAppSubmissionParameters> reader =
+          new SpecificDatumReader<>(AvroLocalAppSubmissionParameters.class);
+      localAppSubmissionParameters = reader.read(null, decoder);
     }
+
+    return new LocalSubmissionFromCS(jobSubmissionParameters, 
localAppSubmissionParameters);
   }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapDriverConfigGenerator.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapDriverConfigGenerator.java
 
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapDriverConfigGenerator.java
index 23f86ff..c2c5fd3 100644
--- 
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapDriverConfigGenerator.java
+++ 
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapDriverConfigGenerator.java
@@ -25,7 +25,9 @@ import org.apache.reef.client.DriverRestartConfiguration;
 import org.apache.reef.client.parameters.DriverConfigurationProviders;
 import org.apache.reef.io.TcpPortConfigurationProvider;
 import org.apache.reef.javabridge.generic.JobDriver;
+import org.apache.reef.reef.bridge.client.avro.AvroAppSubmissionParameters;
 import org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters;
+import org.apache.reef.reef.bridge.client.avro.AvroYarnAppSubmissionParameters;
 import org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters;
 import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier;
 import org.apache.reef.runtime.common.files.REEFFileNames;
@@ -63,31 +65,31 @@ final class YarnBootstrapDriverConfigGenerator {
     this.reefFileNames = reefFileNames;
   }
 
-  public String writeDriverConfigurationFile(final String 
bootstrapArgsLocation) throws IOException {
-    final File bootstrapArgsFile = new File(bootstrapArgsLocation);
-    final AvroYarnJobSubmissionParameters yarnBootstrapArgs =
-        readYarnJobSubmissionParametersFromFile(bootstrapArgsFile);
+  public String writeDriverConfigurationFile(final String 
bootstrapJobArgsLocation,
+                                             final String 
bootstrapAppArgsLocation) throws IOException {
+    final File bootstrapJobArgsFile = new 
File(bootstrapJobArgsLocation).getCanonicalFile();
+    final File bootstrapAppArgsFile = new File(bootstrapAppArgsLocation);
+
+    final AvroYarnJobSubmissionParameters yarnBootstrapJobArgs =
+        readYarnJobSubmissionParametersFromFile(bootstrapJobArgsFile);
+
+    final AvroYarnAppSubmissionParameters yarnBootstrapAppArgs =
+        readYarnAppSubmissionParametersFromFile(bootstrapAppArgsFile);
+
     final String driverConfigPath = reefFileNames.getDriverConfigurationPath();
 
-    
this.configurationSerializer.toFile(getYarnDriverConfiguration(yarnBootstrapArgs),
+    
this.configurationSerializer.toFile(getYarnDriverConfiguration(yarnBootstrapJobArgs,
 yarnBootstrapAppArgs),
         new File(driverConfigPath));
 
     return driverConfigPath;
   }
 
   static Configuration getYarnDriverConfiguration(
-      final AvroYarnJobSubmissionParameters yarnJobSubmissionParams) {
+      final AvroYarnJobSubmissionParameters yarnJobSubmissionParams,
+      final AvroYarnAppSubmissionParameters yarnAppSubmissionParams) {
+
     final AvroJobSubmissionParameters jobSubmissionParameters =
         yarnJobSubmissionParams.getSharedJobSubmissionParameters();
-    final Configuration providerConfig = 
Tang.Factory.getTang().newConfigurationBuilder()
-        .bindSetEntry(DriverConfigurationProviders.class, 
TcpPortConfigurationProvider.class)
-        .bindNamedParameter(TcpPortRangeBegin.class, 
Integer.toString(jobSubmissionParameters.getTcpBeginPort()))
-        .bindNamedParameter(TcpPortRangeCount.class, 
Integer.toString(jobSubmissionParameters.getTcpRangeCount()))
-        .bindNamedParameter(TcpPortRangeTryCount.class, 
Integer.toString(jobSubmissionParameters.getTcpTryCount()))
-        .bindNamedParameter(JobSubmissionDirectoryPrefix.class,
-            
yarnJobSubmissionParams.getJobSubmissionDirectoryPrefix().toString())
-        .build();
-
     final Configuration yarnDriverConfiguration = YarnDriverConfiguration.CONF
         .set(YarnDriverConfiguration.JOB_SUBMISSION_DIRECTORY,
             yarnJobSubmissionParams.getDfsJobSubmissionFolder().toString())
@@ -96,10 +98,21 @@ final class YarnBootstrapDriverConfigGenerator {
         .set(YarnDriverConfiguration.JVM_HEAP_SLACK, 0.0)
         .build();
 
+    final AvroAppSubmissionParameters appSubmissionParams = 
yarnAppSubmissionParams.getSharedAppSubmissionParameters();
+
+    final Configuration providerConfig = 
Tang.Factory.getTang().newConfigurationBuilder()
+        .bindSetEntry(DriverConfigurationProviders.class, 
TcpPortConfigurationProvider.class)
+        .bindNamedParameter(TcpPortRangeBegin.class, 
Integer.toString(appSubmissionParams.getTcpBeginPort()))
+        .bindNamedParameter(TcpPortRangeCount.class, 
Integer.toString(appSubmissionParams.getTcpRangeCount()))
+        .bindNamedParameter(TcpPortRangeTryCount.class, 
Integer.toString(appSubmissionParams.getTcpTryCount()))
+        .bindNamedParameter(JobSubmissionDirectoryPrefix.class,
+            
yarnJobSubmissionParams.getJobSubmissionDirectoryPrefix().toString())
+        .build();
+
     final Configuration driverConfiguration = Configurations.merge(
         Constants.DRIVER_CONFIGURATION_WITH_HTTP_AND_NAMESERVER, 
yarnDriverConfiguration, providerConfig);
 
-    if (yarnJobSubmissionParams.getDriverRecoveryTimeout() > 0) {
+    if (yarnAppSubmissionParams.getDriverRecoveryTimeout() > 0) {
       LOG.log(Level.FINE, "Driver restart is enabled.");
 
       final Configuration yarnDriverRestartConfiguration =
@@ -113,7 +126,7 @@ final class YarnBootstrapDriverConfigGenerator {
               .set(DriverRestartConfiguration.ON_DRIVER_RESTART_TASK_RUNNING,
                   JobDriver.DriverRestartRunningTaskHandler.class)
               
.set(DriverRestartConfiguration.DRIVER_RESTART_EVALUATOR_RECOVERY_SECONDS,
-                  yarnJobSubmissionParams.getDriverRecoveryTimeout())
+                  yarnAppSubmissionParams.getDriverRecoveryTimeout())
               .set(DriverRestartConfiguration.ON_DRIVER_RESTART_COMPLETED,
                   JobDriver.DriverRestartCompletedHandler.class)
               
.set(DriverRestartConfiguration.ON_DRIVER_RESTART_EVALUATOR_FAILED,
@@ -126,6 +139,23 @@ final class YarnBootstrapDriverConfigGenerator {
     return driverConfiguration;
   }
 
+  static AvroYarnAppSubmissionParameters 
readYarnAppSubmissionParametersFromFile(final File file)
+      throws IOException {
+    try (final FileInputStream fileInputStream = new FileInputStream(file)) {
+      // This is mainly a test hook.
+      return readYarnAppSubmissionParametersFromInputStream(fileInputStream);
+    }
+  }
+
+  static AvroYarnAppSubmissionParameters 
readYarnAppSubmissionParametersFromInputStream(
+      final InputStream inputStream) throws IOException {
+    final JsonDecoder decoder = DecoderFactory.get().jsonDecoder(
+        AvroYarnAppSubmissionParameters.getClassSchema(), inputStream);
+    final SpecificDatumReader<AvroYarnAppSubmissionParameters> reader = new 
SpecificDatumReader<>(
+        AvroYarnAppSubmissionParameters.class);
+    return reader.read(null, decoder);
+  }
+
   static AvroYarnJobSubmissionParameters 
readYarnJobSubmissionParametersFromFile(final File file)
       throws IOException {
     try (final FileInputStream fileInputStream = new FileInputStream(file)) {

http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapREEFLauncher.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapREEFLauncher.java
 
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapREEFLauncher.java
index 5f6b5c2..223c853 100644
--- 
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapREEFLauncher.java
+++ 
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnBootstrapREEFLauncher.java
@@ -41,9 +41,20 @@ public final class YarnBootstrapREEFLauncher {
   public static void main(final String[] args) throws IOException, 
InjectionException {
     LOG.log(Level.INFO, "Entering BootstrapLauncher.main().");
 
-    if (args.length != 1) {
-      final String message = "Bootstrap launcher should have a single 
configuration file input specifying the" +
-          " job submission parameters to be deserialized to create the 
YarnDriverConfiguration on the fly.";
+    if (args.length != 2) {
+      final StringBuilder sb = new StringBuilder();
+      sb.append("[ ");
+      for (String arg : args) {
+        sb.append(arg);
+        sb.append(" ");
+      }
+
+      sb.append("]");
+
+      final String message = "Bootstrap launcher should have two configuration 
file inputs, one specifying the" +
+          " application submission parameters to be deserialized and the other 
specifying the job" +
+          " submission parameters to be deserialized to create the 
YarnDriverConfiguration on the fly." +
+          " Current args are " + sb.toString();
 
       throw fatal(message, new IllegalArgumentException(message));
     }
@@ -51,7 +62,7 @@ public final class YarnBootstrapREEFLauncher {
     try {
       final YarnBootstrapDriverConfigGenerator 
yarnDriverConfigurationGenerator =
           
Tang.Factory.getTang().newInjector().getInstance(YarnBootstrapDriverConfigGenerator.class);
-      REEFLauncher.main(new 
String[]{yarnDriverConfigurationGenerator.writeDriverConfigurationFile(args[0])});
+      REEFLauncher.main(new 
String[]{yarnDriverConfigurationGenerator.writeDriverConfigurationFile(args[0], 
args[1])});
     } catch (final Exception exception) {
       if (!(exception instanceof RuntimeException)) {
         throw fatal("Failed to initialize configurations.", exception);

http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java
 
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java
index 6783aa8..6fa3f83 100644
--- 
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java
+++ 
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnClusterSubmissionFromCS.java
@@ -22,9 +22,7 @@ import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.io.JsonDecoder;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.commons.lang.Validate;
-import org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters;
-import 
org.apache.reef.reef.bridge.client.avro.AvroYarnClusterJobSubmissionParameters;
-import org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters;
+import org.apache.reef.reef.bridge.client.avro.*;
 
 import java.io.File;
 import java.io.FileInputStream;
@@ -56,22 +54,29 @@ final class YarnClusterSubmissionFromCS {
   private final String tokenKind;
   private final String tokenService;
   private final String jobSubmissionDirectoryPrefix;
+
+  private final AvroYarnAppSubmissionParameters yarnAppSubmissionParameters;
   private final AvroYarnJobSubmissionParameters yarnJobSubmissionParameters;
 
-  private YarnClusterSubmissionFromCS(final 
AvroYarnClusterJobSubmissionParameters yarnClusterJobSubmissionParameters) {
+  private YarnClusterSubmissionFromCS(final 
AvroYarnClusterAppSubmissionParameters yarnClusterAppSubmissionParameters,
+                                      final 
AvroYarnClusterJobSubmissionParameters yarnClusterJobSubmissionParameters) {
     yarnJobSubmissionParameters = 
yarnClusterJobSubmissionParameters.getYarnJobSubmissionParameters();
+    yarnAppSubmissionParameters = 
yarnClusterAppSubmissionParameters.getYarnAppSubmissionParameters();
 
     final AvroJobSubmissionParameters jobSubmissionParameters =
         yarnJobSubmissionParameters.getSharedJobSubmissionParameters();
 
+    final AvroAppSubmissionParameters appSubmissionParameters =
+        yarnAppSubmissionParameters.getSharedAppSubmissionParameters();
+
     this.driverFolder = new 
File(jobSubmissionParameters.getJobSubmissionFolder().toString());
     this.jobId = jobSubmissionParameters.getJobId().toString();
-    this.tcpBeginPort = jobSubmissionParameters.getTcpBeginPort();
-    this.tcpRangeCount = jobSubmissionParameters.getTcpRangeCount();
-    this.tcpTryCount = jobSubmissionParameters.getTcpTryCount();
-    this.maxApplicationSubmissions = 
yarnClusterJobSubmissionParameters.getMaxApplicationSubmissions();
-    this.driverRecoveryTimeout = 
yarnJobSubmissionParameters.getDriverRecoveryTimeout();
-    this.driverMemory = yarnJobSubmissionParameters.getDriverMemory();
+    this.tcpBeginPort = appSubmissionParameters.getTcpBeginPort();
+    this.tcpRangeCount = appSubmissionParameters.getTcpRangeCount();
+    this.tcpTryCount = appSubmissionParameters.getTcpTryCount();
+    this.maxApplicationSubmissions = 
yarnClusterAppSubmissionParameters.getMaxApplicationSubmissions();
+    this.driverRecoveryTimeout = 
yarnAppSubmissionParameters.getDriverRecoveryTimeout();
+    this.driverMemory = yarnAppSubmissionParameters.getDriverMemory();
     this.priority = DEFAULT_PRIORITY;
     this.queue = DEFAULT_QUEUE;
     this.tokenKind = 
yarnClusterJobSubmissionParameters.getSecurityTokenKind().toString();
@@ -173,6 +178,13 @@ final class YarnClusterSubmissionFromCS {
   }
 
   /**
+   * @return The submission parameters for YARN applications.
+   */
+  AvroYarnAppSubmissionParameters getYarnAppSubmissionParameters() {
+    return yarnAppSubmissionParameters;
+  }
+
+  /**
    * @return The submission parameters for YARN jobs.
    */
   AvroYarnJobSubmissionParameters getYarnJobSubmissionParameters() {
@@ -189,21 +201,31 @@ final class YarnClusterSubmissionFromCS {
   /**
    * Takes the YARN cluster job submission configuration file, deserializes 
it, and creates submission object.
    */
-  static YarnClusterSubmissionFromCS fromJobSubmissionParametersFile(final 
File yarnClusterJobSubmissionParametersFile)
+  static YarnClusterSubmissionFromCS fromJobSubmissionParametersFile(final 
File yarnClusterAppSubmissionParametersFile,
+                                                                     final 
File yarnClusterJobSubmissionParametersFile)
       throws IOException {
-    try (final FileInputStream fileInputStream = new 
FileInputStream(yarnClusterJobSubmissionParametersFile)) {
-      // this is mainly a test hook
-      return readYarnClusterSubmissionFromCSFromInputStream(fileInputStream);
+    try (final FileInputStream appFileInputStream = new 
FileInputStream(yarnClusterAppSubmissionParametersFile)) {
+      try (final FileInputStream jobFileInputStream = new 
FileInputStream(yarnClusterJobSubmissionParametersFile)) {
+        // this is mainly a test hook
+        return 
readYarnClusterSubmissionFromCSFromInputStream(appFileInputStream, 
jobFileInputStream);
+      }
     }
   }
 
   static YarnClusterSubmissionFromCS 
readYarnClusterSubmissionFromCSFromInputStream(
-      final InputStream inputStream) throws IOException {
-    final JsonDecoder decoder = DecoderFactory.get().jsonDecoder(
-        AvroYarnClusterJobSubmissionParameters.getClassSchema(), inputStream);
-    final SpecificDatumReader<AvroYarnClusterJobSubmissionParameters> reader = 
new SpecificDatumReader<>(
+      final InputStream appInputStream, final InputStream jobInputStream) 
throws IOException {
+    final JsonDecoder appDecoder = DecoderFactory.get().jsonDecoder(
+        AvroYarnClusterAppSubmissionParameters.getClassSchema(), 
appInputStream);
+    final SpecificDatumReader<AvroYarnClusterAppSubmissionParameters> 
appReader = new SpecificDatumReader<>(
+        AvroYarnClusterAppSubmissionParameters.class);
+    final AvroYarnClusterAppSubmissionParameters 
yarnClusterAppSubmissionParameters = appReader.read(null, appDecoder);
+
+    final JsonDecoder jobDecoder = DecoderFactory.get().jsonDecoder(
+        AvroYarnClusterJobSubmissionParameters.getClassSchema(), 
jobInputStream);
+    final SpecificDatumReader<AvroYarnClusterJobSubmissionParameters> 
jobReader = new SpecificDatumReader<>(
         AvroYarnClusterJobSubmissionParameters.class);
-    final AvroYarnClusterJobSubmissionParameters 
yarnClusterJobSubmissionParameters = reader.read(null, decoder);
-    return new YarnClusterSubmissionFromCS(yarnClusterJobSubmissionParameters);
+    final AvroYarnClusterJobSubmissionParameters 
yarnClusterJobSubmissionParameters = jobReader.read(null, jobDecoder);
+
+    return new YarnClusterSubmissionFromCS(yarnClusterAppSubmissionParameters, 
yarnClusterJobSubmissionParameters);
   }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
 
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
index 9250462..d43ee8d 100644
--- 
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
+++ 
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.reef.runtime.common.files.ClasspathProvider;
@@ -68,7 +69,7 @@ public final class YarnJobSubmissionClient {
   private final ClasspathProvider classpath;
   private final SecurityTokenProvider tokenProvider;
   private final List<String> commandPrefixList;
-  private final YarnJobSubmissionParametersFileGenerator 
jobSubmissionParametersGenerator;
+  private final YarnSubmissionParametersFileGenerator 
jobSubmissionParametersGenerator;
 
   @Inject
   YarnJobSubmissionClient(final JobUploader uploader,
@@ -78,7 +79,7 @@ public final class YarnJobSubmissionClient {
                           @Parameter(DriverLaunchCommandPrefix.class)
                           final List<String> commandPrefixList,
                           final SecurityTokenProvider tokenProvider,
-                          final YarnJobSubmissionParametersFileGenerator 
jobSubmissionParametersGenerator) {
+                          final YarnSubmissionParametersFileGenerator 
jobSubmissionParametersGenerator) {
     this.uploader = uploader;
     this.fileNames = fileNames;
     this.yarnConfiguration = yarnConfiguration;
@@ -121,13 +122,24 @@ public final class YarnJobSubmissionClient {
       // 
------------------------------------------------------------------------
       // Upload the JAR
       LOG.info("Uploading job submission JAR");
-      final LocalResource jarFileOnDFS = 
jobFolderOnDFS.uploadAsLocalResource(jarFile);
+      final LocalResource jarFileOnDFS = 
jobFolderOnDFS.uploadAsLocalResource(jarFile, LocalResourceType.ARCHIVE);
       LOG.info("Uploaded job submission JAR");
 
       // 
------------------------------------------------------------------------
+      // Upload the job file
+      final LocalResource jobFileOnDFS = jobFolderOnDFS.uploadAsLocalResource(
+          new File(yarnSubmission.getDriverFolder(), 
fileNames.getYarnBootstrapJobParamFilePath()),
+          LocalResourceType.FILE);
+
+      final List<String> confFiles = new ArrayList<>();
+      confFiles.add(fileNames.getYarnBootstrapJobParamFilePath());
+      confFiles.add(fileNames.getYarnBootstrapAppParamFilePath());
+
+      // 
------------------------------------------------------------------------
       // Submit
       submissionHelper
           .addLocalResource(this.fileNames.getREEFFolderName(), jarFileOnDFS)
+          .addLocalResource(fileNames.getYarnBootstrapJobParamFilePath(), 
jobFileOnDFS)
           .setApplicationName(yarnSubmission.getJobId())
           .setDriverMemory(yarnSubmission.getDriverMemory())
           .setPriority(yarnSubmission.getPriority())
@@ -135,7 +147,7 @@ public final class YarnJobSubmissionClient {
           
.setMaxApplicationAttempts(yarnSubmission.getMaxApplicationSubmissions())
           .setPreserveEvaluators(yarnSubmission.getDriverRecoveryTimeout() > 0)
           .setLauncherClass(YarnBootstrapREEFLauncher.class)
-          .setConfigurationFileName(fileNames.getYarnBootstrapParamFilePath())
+          .setConfigurationFilePaths(confFiles)
           .submit();
       writeDriverHttpEndPoint(yarnSubmission.getDriverFolder(),
           submissionHelper.getStringApplicationId(), jobFolderOnDFS.getPath());
@@ -221,16 +233,23 @@ public final class YarnJobSubmissionClient {
   /**
    * .NET client calls into this main method for job submission.
    * For arguments detail:
-   * @see YarnClusterSubmissionFromCS#fromJobSubmissionParametersFile(File)
+   * @see YarnClusterSubmissionFromCS#fromJobSubmissionParametersFile(File, 
File)
    */
   public static void main(final String[] args) throws InjectionException, 
IOException, YarnException {
     final File jobSubmissionParametersFile = new File(args[0]);
+    final File appSubmissionParametersFile = new File(args[1]);
+
+    if (!(appSubmissionParametersFile.exists() && 
appSubmissionParametersFile.canRead())) {
+      throw new IOException("Unable to open and read " + 
appSubmissionParametersFile.getAbsolutePath());
+    }
+
     if (!(jobSubmissionParametersFile.exists() && 
jobSubmissionParametersFile.canRead())) {
       throw new IOException("Unable to open and read " + 
jobSubmissionParametersFile.getAbsolutePath());
     }
 
     final YarnClusterSubmissionFromCS yarnSubmission =
-        
YarnClusterSubmissionFromCS.fromJobSubmissionParametersFile(jobSubmissionParametersFile);
+        YarnClusterSubmissionFromCS.fromJobSubmissionParametersFile(
+            appSubmissionParametersFile, jobSubmissionParametersFile);
 
     LOG.log(Level.INFO, "YARN job submission received from C#: {0}", 
yarnSubmission);
     if (!yarnSubmission.getTokenKind().equalsIgnoreCase("NULL")) {

http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionParametersFileGenerator.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionParametersFileGenerator.java
 
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionParametersFileGenerator.java
deleted file mode 100644
index ccb5e8b..0000000
--- 
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionParametersFileGenerator.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.bridge.client;
-
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.io.JsonEncoder;
-import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters;
-import org.apache.reef.runtime.common.files.REEFFileNames;
-import org.apache.reef.runtime.yarn.client.uploader.JobFolder;
-
-import javax.inject.Inject;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.logging.Logger;
-
-/**
- * Does client side manipulation of driver configuration for YARN runtime.
- */
-final class YarnJobSubmissionParametersFileGenerator {
-  private static final Logger LOG = 
Logger.getLogger(YarnJobSubmissionParametersFileGenerator.class.getName());
-  private final REEFFileNames fileNames;
-
-  @Inject
-  private YarnJobSubmissionParametersFileGenerator(final REEFFileNames 
fileNames) {
-    this.fileNames = fileNames;
-  }
-
-  /**
-   * Writes driver configuration to disk.
-   * @param yarnClusterSubmissionFromCS the information needed to submit 
encode YARN parameters and create the
-   *                                    YARN job for submission from the 
cluster.
-   * @throws IOException
-   */
-  public void writeConfiguration(final YarnClusterSubmissionFromCS 
yarnClusterSubmissionFromCS,
-                                 final JobFolder jobFolderOnDFS) throws 
IOException {
-    final File yarnParametersFile = new 
File(yarnClusterSubmissionFromCS.getDriverFolder(),
-        fileNames.getYarnBootstrapParamFilePath());
-
-    try (final FileOutputStream fileOutputStream = new 
FileOutputStream(yarnParametersFile)) {
-      // this is mainly a test hook.
-      writeAvroYarnJobSubmissionParametersToOutputStream(
-          yarnClusterSubmissionFromCS, jobFolderOnDFS.getPath().toString(), 
fileOutputStream);
-    }
-  }
-
-  static void writeAvroYarnJobSubmissionParametersToOutputStream(
-      final YarnClusterSubmissionFromCS yarnClusterSubmissionFromCS,
-      final String jobFolderOnDFSPath,
-      final OutputStream outputStream) throws IOException {
-    final DatumWriter<AvroYarnJobSubmissionParameters> datumWriter =
-        new SpecificDatumWriter<>(AvroYarnJobSubmissionParameters.class);
-
-    final AvroYarnJobSubmissionParameters jobSubmissionParameters =
-        yarnClusterSubmissionFromCS.getYarnJobSubmissionParameters();
-    jobSubmissionParameters.setDfsJobSubmissionFolder(jobFolderOnDFSPath);
-    final JsonEncoder encoder = 
EncoderFactory.get().jsonEncoder(jobSubmissionParameters.getSchema(),
-        outputStream);
-    datumWriter.write(jobSubmissionParameters, encoder);
-    encoder.flush();
-    outputStream.flush();
-  }
-}

http://git-wip-us.apache.org/repos/asf/reef/blob/1137cdee/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnSubmissionParametersFileGenerator.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnSubmissionParametersFileGenerator.java
 
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnSubmissionParametersFileGenerator.java
new file mode 100644
index 0000000..506ced6
--- /dev/null
+++ 
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnSubmissionParametersFileGenerator.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.bridge.client;
+
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.io.JsonEncoder;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.reef.reef.bridge.client.avro.AvroYarnAppSubmissionParameters;
+import org.apache.reef.reef.bridge.client.avro.AvroYarnJobSubmissionParameters;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.yarn.client.uploader.JobFolder;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.logging.Logger;
+
+/**
+ * Does client side manipulation of driver configuration for YARN runtime.
+ */
+final class YarnSubmissionParametersFileGenerator {
+  private static final Logger LOG = 
Logger.getLogger(YarnSubmissionParametersFileGenerator.class.getName());
+  private final REEFFileNames fileNames;
+
+  @Inject
+  private YarnSubmissionParametersFileGenerator(final REEFFileNames fileNames) 
{
+    this.fileNames = fileNames;
+  }
+
+  /**
+   * Writes driver configuration to disk.
+   * @param yarnClusterSubmissionFromCS the information needed to submit 
encode YARN parameters and create the
+   *                                    YARN job for submission from the 
cluster.
+   * @throws IOException
+   */
+  public void writeConfiguration(final YarnClusterSubmissionFromCS 
yarnClusterSubmissionFromCS,
+                                 final JobFolder jobFolderOnDFS) throws 
IOException {
+    final File yarnAppParametersFile = new 
File(yarnClusterSubmissionFromCS.getDriverFolder(),
+        fileNames.getYarnBootstrapAppParamFilePath());
+
+    final File yarnJobParametersFile = new 
File(yarnClusterSubmissionFromCS.getDriverFolder(),
+        fileNames.getYarnBootstrapJobParamFilePath());
+
+    try (final FileOutputStream appFileOutputStream = new 
FileOutputStream(yarnAppParametersFile)) {
+      try (final FileOutputStream jobFileOutputStream = new 
FileOutputStream(yarnJobParametersFile)) {
+        // this is mainly a test hook.
+        
writeAvroYarnAppSubmissionParametersToOutputStream(yarnClusterSubmissionFromCS, 
appFileOutputStream);
+        writeAvroYarnJobSubmissionParametersToOutputStream(
+            yarnClusterSubmissionFromCS, jobFolderOnDFS.getPath().toString(), 
jobFileOutputStream);
+      }
+    }
+  }
+
+  static void writeAvroYarnAppSubmissionParametersToOutputStream(
+      final YarnClusterSubmissionFromCS yarnClusterSubmissionFromCS,
+      final OutputStream outputStream) throws IOException {
+    final DatumWriter<AvroYarnAppSubmissionParameters> datumWriter =
+        new SpecificDatumWriter<>(AvroYarnAppSubmissionParameters.class);
+
+    final AvroYarnAppSubmissionParameters appSubmissionParameters =
+        yarnClusterSubmissionFromCS.getYarnAppSubmissionParameters();
+    final JsonEncoder encoder = 
EncoderFactory.get().jsonEncoder(appSubmissionParameters.getSchema(), 
outputStream);
+    datumWriter.write(appSubmissionParameters, encoder);
+    encoder.flush();
+    outputStream.flush();
+  }
+
+  static void writeAvroYarnJobSubmissionParametersToOutputStream(
+      final YarnClusterSubmissionFromCS yarnClusterSubmissionFromCS,
+      final String jobFolderOnDFSPath,
+      final OutputStream outputStream) throws IOException {
+    final DatumWriter<AvroYarnJobSubmissionParameters> datumWriter =
+        new SpecificDatumWriter<>(AvroYarnJobSubmissionParameters.class);
+
+    final AvroYarnJobSubmissionParameters jobSubmissionParameters =
+        yarnClusterSubmissionFromCS.getYarnJobSubmissionParameters();
+    jobSubmissionParameters.setDfsJobSubmissionFolder(jobFolderOnDFSPath);
+    final JsonEncoder encoder = 
EncoderFactory.get().jsonEncoder(jobSubmissionParameters.getSchema(),
+        outputStream);
+    datumWriter.write(jobSubmissionParameters, encoder);
+    encoder.flush();
+    outputStream.flush();
+  }
+}

Reply via email to