Repository: incubator-reef Updated Branches: refs/heads/master 8068dc978 -> b9686cd79
[REEF-324]:Propagate clr tcpprovider bindings to java This addressed the issue by passing specific tcp bindings from clr to java side. JIRA: [REEF-324](https://issues.apache.org/jira/browse/REEF-324) Pull Request: This closes #185 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/b9686cd7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/b9686cd7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/b9686cd7 Branch: refs/heads/master Commit: b9686cd79d11c22ef99cc5d9f6095a41e08a125b Parents: 8068dc9 Author: Beysim Sezgin <[email protected]> Authored: Wed May 20 17:39:31 2015 -0700 Committer: Markus Weimer <[email protected]> Committed: Wed May 20 17:41:55 2015 -0700 ---------------------------------------------------------------------- .../ClrClient2JavaClientCuratedParameters.cs | 45 ++++++++++++ .../Org.Apache.REEF.Client/Local/LocalClient.cs | 21 ++++-- .../Local/LocalRuntimeClientConfiguration.cs | 33 --------- .../Org.Apache.REEF.Client.csproj | 1 + .../Org.Apache.REEF.Client/YARN/YARNClient.cs | 13 +++- .../YARN/YARNClientConfiguration.cs | 36 --------- .../Remote/Impl/DefaultRemoteManager.cs | 4 + .../apache/reef/bridge/client/LocalClient.java | 62 +++++++++++++--- .../bridge/client/YarnJobSubmissionClient.java | 37 +++++++--- .../reef/io/TcpPortConfigurationProvider.java | 77 ++++++++++++++++++++ .../reef/io/network/naming/NameServerImpl.java | 9 ++- .../local/client/LocalRuntimeConfiguration.java | 19 +---- .../yarn/client/YarnClientConfiguration.java | 20 +---- .../wake/remote/ports/RangeTcpPortProvider.java | 15 +--- .../reef/wake/remote/ports/TcpPortProvider.java | 10 +-- 15 files changed, 248 insertions(+), 154 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b9686cd7/lang/cs/Org.Apache.REEF.Client/Common/ClrClient2JavaClientCuratedParameters.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/Common/ClrClient2JavaClientCuratedParameters.cs b/lang/cs/Org.Apache.REEF.Client/Common/ClrClient2JavaClientCuratedParameters.cs new file mode 100644 index 0000000..e40ab59 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Client/Common/ClrClient2JavaClientCuratedParameters.cs @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Wake.Remote.Parameters; + +namespace Org.Apache.REEF.Client.Common +{ + internal class ClrClient2JavaClientCuratedParameters + { + public int TcpPortRangeStart { get; private set; } + public int TcpPortRangeCount { get; private set; } + public int TcpPortRangeTryCount { get; private set; } + public int TcpPortRangeSeed { get; private set; } + + + [Inject] + private ClrClient2JavaClientCuratedParameters( + [Parameter(typeof(TcpPortRangeStart))] int tcpPortRangeStart, + [Parameter(typeof(TcpPortRangeCount))] int tcpPortRangeCount, + [Parameter(typeof(TcpPortRangeTryCount))] int tcpPortRangeTryCount, + [Parameter(typeof(TcpPortRangeSeed))] int tcpPortRangeSeed) + { + TcpPortRangeStart = tcpPortRangeStart; + TcpPortRangeCount = tcpPortRangeCount; + TcpPortRangeTryCount = tcpPortRangeTryCount; + TcpPortRangeSeed = tcpPortRangeSeed; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b9686cd7/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs b/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs index 3c4316c..518533e 100644 --- a/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs +++ b/lang/cs/Org.Apache.REEF.Client/Local/LocalClient.cs @@ -19,10 +19,12 @@ using System; using System.IO; +using System.Linq; using Org.Apache.REEF.Client.API; using Org.Apache.REEF.Client.Common; using Org.Apache.REEF.Client.Local.Parameters; using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Utilities.Logging; namespace Org.Apache.REEF.Client.Local @@ -42,7 +44,7 @@ namespace Org.Apache.REEF.Client.Local /// </summary> private const string DriverFolderName = "driver"; - private static readonly Logger Logger = Logger.GetLogger(typeof (LocalClient)); + private static readonly Logger Logger = Logger.GetLogger(typeof(LocalClient)); private readonly DriverFolderPreparationHelper _driverFolderPreparationHelper; private readonly JavaClientLauncher _javaClientLauncher; private readonly int _numberOfEvaluators; @@ -50,8 +52,8 @@ namespace Org.Apache.REEF.Client.Local [Inject] private LocalClient(DriverFolderPreparationHelper driverFolderPreparationHelper, - [Parameter(typeof (LocalRuntimeDirectory))] string runtimeFolder, - [Parameter(typeof (NumberOfEvaluators))] int numberOfEvaluators, JavaClientLauncher javaClientLauncher) + [Parameter(typeof(LocalRuntimeDirectory))] string runtimeFolder, + [Parameter(typeof(NumberOfEvaluators))] int numberOfEvaluators, JavaClientLauncher javaClientLauncher) { _driverFolderPreparationHelper = driverFolderPreparationHelper; _runtimeFolder = runtimeFolder; @@ -68,7 +70,7 @@ namespace Org.Apache.REEF.Client.Local [Inject] private LocalClient( DriverFolderPreparationHelper driverFolderPreparationHelper, - [Parameter(typeof (NumberOfEvaluators))] int numberOfEvaluators, JavaClientLauncher javaClientLauncher) + [Parameter(typeof(NumberOfEvaluators))] int numberOfEvaluators, JavaClientLauncher javaClientLauncher) : this(driverFolderPreparationHelper, Path.GetTempPath(), numberOfEvaluators, javaClientLauncher) { // Intentionally left blank. @@ -83,8 +85,17 @@ namespace Org.Apache.REEF.Client.Local _driverFolderPreparationHelper.PrepareDriverFolder(jobSubmission, driverFolder); + //TODO: Remove this when we have a generalized way to pass config to java + var javaParams = TangFactory.GetTang() + .NewInjector(jobSubmission.DriverConfigurations.ToArray()) + .GetInstance<ClrClient2JavaClientCuratedParameters>(); + _javaClientLauncher.Launch(JavaClassName, driverFolder, jobSubmission.JobIdentifier, - _numberOfEvaluators.ToString()); + _numberOfEvaluators.ToString(), + javaParams.TcpPortRangeStart.ToString(), + javaParams.TcpPortRangeCount.ToString(), + javaParams.TcpPortRangeTryCount.ToString() + ); Logger.Log(Level.Info, "Submitted the Driver for execution."); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b9686cd7/lang/cs/Org.Apache.REEF.Client/Local/LocalRuntimeClientConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/Local/LocalRuntimeClientConfiguration.cs b/lang/cs/Org.Apache.REEF.Client/Local/LocalRuntimeClientConfiguration.cs index 2504a5d..20c3c98 100644 --- a/lang/cs/Org.Apache.REEF.Client/Local/LocalRuntimeClientConfiguration.cs +++ b/lang/cs/Org.Apache.REEF.Client/Local/LocalRuntimeClientConfiguration.cs @@ -20,9 +20,7 @@ using Org.Apache.REEF.Client.API; using Org.Apache.REEF.Client.Local.Parameters; using Org.Apache.REEF.Tang.Formats; -using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Tang.Util; -using Org.Apache.REEF.Wake.Remote.Parameters; namespace Org.Apache.REEF.Client.Local { @@ -47,41 +45,10 @@ namespace Org.Apache.REEF.Client.Local /// </remarks> public static readonly OptionalParameter<string> RuntimeFolder = new OptionalParameter<string>(); - /// <summary> - /// Configuration provides whose Configuration will be merged into all Driver Configuration. - /// </summary> - public static readonly OptionalImpl<IConfigurationProvider> DriverConfigurationProvider = - new OptionalImpl<IConfigurationProvider>(); - - /// <summary> - /// Start of the tcp port range for listening. - /// </summary> - public static readonly OptionalParameter<int> TcpPortRangeStartParameter = new OptionalParameter<int>(); - - /// <summary> - /// Number of port for the tcp port range for listening. - /// </summary> - public static readonly OptionalParameter<int> TcpPortRangeCountParameter = new OptionalParameter<int>(); - - /// <summary> - /// Max number of times we will deliver a port from the tcp port range. - /// </summary> - public static readonly OptionalParameter<int> TcpPortRangeTryCountParameter = new OptionalParameter<int>(); - - /// <summary> - /// Seed for the number number for determining which particular port to deliver from the range - /// </summary> - public static readonly OptionalParameter<int> TcpPortRangeSeedParameter = new OptionalParameter<int>(); - public static ConfigurationModule ConfigurationModule = new LocalRuntimeClientConfiguration() .BindImplementation(GenericType<IREEFClient>.Class, GenericType<LocalClient>.Class) .BindNamedParameter(GenericType<LocalRuntimeDirectory>.Class, RuntimeFolder) .BindNamedParameter(GenericType<NumberOfEvaluators>.Class, NumberOfEvaluators) - .BindSetEntry(GenericType<DriverConfigurationProviders>.Class, DriverConfigurationProvider) - .BindNamedParameter(GenericType<TcpPortRangeStart>.Class, TcpPortRangeStartParameter) - .BindNamedParameter(GenericType<TcpPortRangeCount>.Class, TcpPortRangeCountParameter) - .BindNamedParameter(GenericType<TcpPortRangeTryCount>.Class, TcpPortRangeTryCountParameter) - .BindNamedParameter(GenericType<TcpPortRangeSeed>.Class, TcpPortRangeSeedParameter) .Build(); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b9686cd7/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj b/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj index fde24ed..0deb829 100644 --- a/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj +++ b/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj @@ -50,6 +50,7 @@ under the License. <Compile Include="API\JobSubmissionBuilderFactory.cs" /> <Compile Include="CLRBridgeClient.cs" /> <Compile Include="Common\ClientConstants.cs" /> + <Compile Include="Common\ClrClient2JavaClientCuratedParameters.cs" /> <Compile Include="Common\DriverFolderPreparationHelper.cs" /> <Compile Include="Common\FileSets.cs" /> <Compile Include="Common\JavaClientLauncher.cs" /> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b9686cd7/lang/cs/Org.Apache.REEF.Client/YARN/YARNClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/YARNClient.cs b/lang/cs/Org.Apache.REEF.Client/YARN/YARNClient.cs index 93a53a0..b6fcc0c 100644 --- a/lang/cs/Org.Apache.REEF.Client/YARN/YARNClient.cs +++ b/lang/cs/Org.Apache.REEF.Client/YARN/YARNClient.cs @@ -19,9 +19,11 @@ using System; using System.IO; +using System.Linq; using Org.Apache.REEF.Client.API; using Org.Apache.REEF.Client.Common; using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Utilities.Logging; namespace Org.Apache.REEF.Client.YARN @@ -55,9 +57,18 @@ namespace Org.Apache.REEF.Client.YARN _driverFolderPreparationHelper.PrepareDriverFolder(jobSubmission, driverFolderPath); + //TODO: Remove this when we have a generalized way to pass config to java + var javaParams = TangFactory.GetTang() + .NewInjector(jobSubmission.DriverConfigurations.ToArray()) + .GetInstance<ClrClient2JavaClientCuratedParameters>(); + // Submit the driver _javaClientLauncher.Launch(JavaClassName, driverFolderPath, jobSubmission.JobIdentifier, - jobSubmission.DriverMemory.ToString()); + jobSubmission.DriverMemory.ToString(), + javaParams.TcpPortRangeStart.ToString(), + javaParams.TcpPortRangeCount.ToString(), + javaParams.TcpPortRangeTryCount.ToString() + ); Logger.Log(Level.Info, "Submitted the Driver for execution."); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b9686cd7/lang/cs/Org.Apache.REEF.Client/YARN/YARNClientConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/YARNClientConfiguration.cs b/lang/cs/Org.Apache.REEF.Client/YARN/YARNClientConfiguration.cs index 69c4b29..ad53d0c 100644 --- a/lang/cs/Org.Apache.REEF.Client/YARN/YARNClientConfiguration.cs +++ b/lang/cs/Org.Apache.REEF.Client/YARN/YARNClientConfiguration.cs @@ -17,13 +17,8 @@ * under the License. */ using Org.Apache.REEF.Client.API; -using Org.Apache.REEF.Client.Local.Parameters; -using Org.Apache.REEF.Common.Io; using Org.Apache.REEF.Tang.Formats; -using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Tang.Util; -using Org.Apache.REEF.Wake.Remote; -using Org.Apache.REEF.Wake.Remote.Parameters; namespace Org.Apache.REEF.Client.YARN { @@ -32,39 +27,8 @@ namespace Org.Apache.REEF.Client.YARN /// </summary> public sealed class YARNClientConfiguration : ConfigurationModuleBuilder { - /// <summary> - /// Configuration provides whose Configuration will be merged into all Driver Configuration. - /// </summary> - public static readonly OptionalImpl<IConfigurationProvider> DriverConfigurationProvider = new OptionalImpl<IConfigurationProvider>(); - - /// <summary> - /// Start of the tcp port range for listening. - /// </summary> - public static readonly OptionalParameter<int> TcpPortRangeStartParameter = new OptionalParameter<int>(); - - /// <summary> - /// Number of port for the tcp port range for listening. - /// </summary> - public static readonly OptionalParameter<int> TcpPortRangeCountParameter = new OptionalParameter<int>(); - - /// <summary> - /// Max number of times we will deliver a port from the tcp port range. - /// </summary> - public static readonly OptionalParameter<int> TcpPortRangeTryCountParameter = new OptionalParameter<int>(); - - /// <summary> - /// Seed for the number number for determining which particular port to deliver from the range - /// </summary> - public static readonly OptionalParameter<int> TcpPortRangeSeedParameter = new OptionalParameter<int>(); - - public static ConfigurationModule ConfigurationModule = new YARNClientConfiguration() .BindImplementation(GenericType<IREEFClient>.Class, GenericType<YARNClient>.Class) - .BindSetEntry(GenericType<DriverConfigurationProviders>.Class, DriverConfigurationProvider) - .BindNamedParameter(GenericType<TcpPortRangeStart>.Class, TcpPortRangeStartParameter) - .BindNamedParameter(GenericType<TcpPortRangeCount>.Class, TcpPortRangeCountParameter) - .BindNamedParameter(GenericType<TcpPortRangeTryCount>.Class, TcpPortRangeTryCountParameter) - .BindNamedParameter(GenericType<TcpPortRangeSeed>.Class, TcpPortRangeSeedParameter) .Build(); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b9686cd7/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManager.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManager.cs index de577d1..0290029 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManager.cs +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManager.cs @@ -198,6 +198,10 @@ namespace Org.Apache.REEF.Wake.Remote.Impl { TransportClient<IRemoteEvent<T>> client = new TransportClient<IRemoteEvent<T>>(remoteEndpoint, _codec, _observerContainer); + LOGGER.Log(Level.Info, + String.Format("NewClientConnection: Local {0} connected to Remote {1}", + client.Link.LocalEndpoint.ToString(), + client.Link.RemoteEndpoint.ToString())); remoteObserver = new ProxyObserver(client); _cachedClients[remoteEndpoint] = remoteObserver; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b9686cd7/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java index e8d4fdf..389e3bf 100644 --- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java +++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/LocalClient.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,19 +18,25 @@ */ package org.apache.reef.bridge.client; +import org.apache.reef.client.parameters.DriverConfigurationProviders; +import org.apache.reef.io.TcpPortConfigurationProvider; import org.apache.reef.runtime.common.driver.api.AbstractDriverRuntimeConfiguration; import org.apache.reef.runtime.common.files.REEFFileNames; import org.apache.reef.runtime.local.client.DriverConfigurationProvider; import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration; import org.apache.reef.runtime.local.client.PreparedDriverFolderLauncher; -import org.apache.reef.tang.Configuration; -import org.apache.reef.tang.Tang; +import org.apache.reef.tang.*; +import org.apache.reef.tang.annotations.Parameter; import org.apache.reef.tang.exceptions.InjectionException; import org.apache.reef.tang.formats.AvroConfigurationSerializer; +import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeBegin; +import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeCount; +import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeTryCount; import javax.inject.Inject; import java.io.File; import java.io.IOException; +import java.util.Set; /** * Submits a folder containing a Driver to the local runtime. @@ -42,16 +48,19 @@ public class LocalClient { private final PreparedDriverFolderLauncher launcher; private final REEFFileNames fileNames; private final DriverConfigurationProvider driverConfigurationProvider; + private final Set<ConfigurationProvider> configurationProviders; @Inject public LocalClient(final AvroConfigurationSerializer configurationSerializer, final PreparedDriverFolderLauncher launcher, final REEFFileNames fileNames, - final DriverConfigurationProvider driverConfigurationProvider) { + final DriverConfigurationProvider driverConfigurationProvider, + final @Parameter(DriverConfigurationProviders.class) Set<ConfigurationProvider> configurationProviders) { this.configurationSerializer = configurationSerializer; this.launcher = launcher; this.fileNames = fileNames; this.driverConfigurationProvider = driverConfigurationProvider; + this.configurationProviders = configurationProviders; } public void submit(final File jobFolder, final String jobId) throws IOException { @@ -64,8 +73,17 @@ public class LocalClient { throw new IOException("The Driver folder " + driverFolder.getAbsolutePath() + " doesn't exist."); } - final Configuration driverConfiguration = driverConfigurationProvider + final Configuration driverConfiguration1 = driverConfigurationProvider .getDriverConfiguration(jobFolder, CLIENT_REMOTE_ID, jobId, Constants.DRIVER_CONFIGURATION_WITH_HTTP_AND_NAMESERVER); + final ConfigurationBuilder configurationBuilder = Tang.Factory.getTang().newConfigurationBuilder(); + for (final ConfigurationProvider configurationProvider : this.configurationProviders) { + configurationBuilder.addConfiguration(configurationProvider.getConfiguration()); + } + final Configuration providedConfigurations = configurationBuilder.build(); + final Configuration driverConfiguration = Configurations.merge( + driverConfiguration1, + providedConfigurations); + final File driverConfigurationFile = new File(driverFolder, fileNames.getDriverConfigurationPath()); configurationSerializer.toFile(driverConfiguration, driverConfigurationFile); launcher.launch(driverFolder, jobId, CLIENT_REMOTE_ID); @@ -77,15 +95,16 @@ public class LocalClient { // We assume the given path to be the one of the driver. The job folder is one level up from there. final File jobFolder = new File(args[0]).getParentFile(); - // The job identifier + final String runtimeRootFolder = jobFolder.getParentFile().getAbsolutePath(); final String jobId = args[1]; // The number of evaluators the local runtime can create final int numberOfEvaluators = Integer.valueOf(args[2]); + final int tcpBeginPort = Integer.valueOf(args[3]); + final int tcpRangeCount = Integer.valueOf(args[4]); + final int tcpTryCount = Integer.valueOf(args[5]); - final Configuration runtimeConfiguration = LocalRuntimeConfiguration.CONF - .set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS, numberOfEvaluators) - .set(LocalRuntimeConfiguration.RUNTIME_ROOT_FOLDER, jobFolder.getParentFile().getAbsolutePath()) - .build(); + + final Configuration runtimeConfiguration = getRuntimeConfiguration(numberOfEvaluators, runtimeRootFolder, tcpBeginPort, tcpRangeCount, tcpTryCount); final LocalClient client = Tang.Factory.getTang() .newInjector(runtimeConfiguration) @@ -93,4 +112,27 @@ public class LocalClient { client.submit(jobFolder, jobId); } + + private static Configuration getRuntimeConfiguration( + int numberOfEvaluators, + String runtimeRootFolder, + int tcpBeginPort, + int tcpRangeCount, + int tcpTryCount) { + final Configuration runtimeConfiguration = getRuntimeConfiguration(numberOfEvaluators, runtimeRootFolder); + final Configuration userproviderConfiguration = Tang.Factory.getTang().newConfigurationBuilder() + .bindSetEntry(DriverConfigurationProviders.class, TcpPortConfigurationProvider.class) + .bindNamedParameter(TcpPortRangeBegin.class, Integer.toString(tcpBeginPort)) + .bindNamedParameter(TcpPortRangeCount.class, Integer.toString(tcpRangeCount)) + .bindNamedParameter(TcpPortRangeTryCount.class, Integer.toString(tcpTryCount)) + .build(); + return Configurations.merge(runtimeConfiguration, userproviderConfiguration); + } + + private static Configuration getRuntimeConfiguration(int numberOfEvaluators, String runtimeRootFolder) { + return LocalRuntimeConfiguration.CONF + .set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS, Integer.toString(numberOfEvaluators)) + .set(LocalRuntimeConfiguration.RUNTIME_ROOT_FOLDER, runtimeRootFolder) + .build(); + } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b9686cd7/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java index bd4f009..248375a 100644 --- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java +++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -21,6 +21,8 @@ package org.apache.reef.bridge.client; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.reef.client.parameters.DriverConfigurationProviders; +import org.apache.reef.io.TcpPortConfigurationProvider; import org.apache.reef.runtime.common.driver.api.AbstractDriverRuntimeConfiguration; import org.apache.reef.runtime.common.files.ClasspathProvider; import org.apache.reef.runtime.common.files.REEFFileNames; @@ -29,12 +31,13 @@ import org.apache.reef.runtime.yarn.client.YarnSubmissionHelper; import org.apache.reef.runtime.yarn.client.uploader.JobFolder; import org.apache.reef.runtime.yarn.client.uploader.JobUploader; import org.apache.reef.runtime.yarn.driver.YarnDriverConfiguration; -import org.apache.reef.tang.Configuration; -import org.apache.reef.tang.Configurations; -import org.apache.reef.tang.Tang; +import org.apache.reef.tang.*; import org.apache.reef.tang.exceptions.InjectionException; import org.apache.reef.tang.formats.ConfigurationSerializer; import org.apache.reef.util.JARFileMaker; +import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeBegin; +import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeCount; +import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeTryCount; import javax.inject.Inject; import java.io.File; @@ -151,16 +154,32 @@ public final class YarnJobSubmissionClient { } } + private static Configuration getRuntimeConfiguration(int tcpBeginPort, int tcpRangeCount, int tcpTryCount) { + Configuration yarnClientConfig = YarnClientConfiguration.CONF + .build(); + + Configuration providerConfig = Tang.Factory.getTang().newConfigurationBuilder() + .bindSetEntry(DriverConfigurationProviders.class, TcpPortConfigurationProvider.class) + .bindNamedParameter(TcpPortRangeBegin.class, Integer.toString(tcpBeginPort)) + .bindNamedParameter(TcpPortRangeCount.class, Integer.toString(tcpRangeCount)) + .bindNamedParameter(TcpPortRangeTryCount.class, Integer.toString(tcpTryCount)) + .build(); + + return Configurations.merge(yarnClientConfig, providerConfig); + } + public static void main(final String[] args) throws InjectionException, IOException, YarnException { final File driverFolder = new File(args[0]); final String jobId = args[1]; final int driverMemory = Integer.valueOf(args[2]); - + final int tcpBeginPort = Integer.valueOf(args[3]); + final int tcpRangeCount = Integer.valueOf(args[4]); + final int tcpTryCount = Integer.valueOf(args[5]); // Static for now final int priority = 1; final String queue = "default"; - final Configuration yarnConfiguration = YarnClientConfiguration.CONF.build(); + final Configuration yarnConfiguration = getRuntimeConfiguration(tcpBeginPort, tcpRangeCount, tcpTryCount); final YarnJobSubmissionClient client = Tang.Factory.getTang() .newInjector(yarnConfiguration) .getInstance(YarnJobSubmissionClient.class); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b9686cd7/lang/java/reef-common/src/main/java/org/apache/reef/io/TcpPortConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/io/TcpPortConfigurationProvider.java b/lang/java/reef-common/src/main/java/org/apache/reef/io/TcpPortConfigurationProvider.java new file mode 100644 index 0000000..19f311c --- /dev/null +++ b/lang/java/reef-common/src/main/java/org/apache/reef/io/TcpPortConfigurationProvider.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io; + +import org.apache.reef.driver.parameters.EvaluatorConfigurationProviders; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.ConfigurationProvider; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeBegin; +import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeCount; +import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeTryCount; + +import javax.inject.Inject; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Implements ConfigurationProvider for RangeTcpPortProvider + */ +public class TcpPortConfigurationProvider implements ConfigurationProvider { + private final int portRangeBegin; + private final int portRangeCount; + private final int portRangeTryCount; + private static final Logger LOG = Logger.getLogger(TcpPortConfigurationProvider.class.getName()); + + @Inject + TcpPortConfigurationProvider(final @Parameter(TcpPortRangeBegin.class) int portRangeBegin, + final @Parameter(TcpPortRangeCount.class) int portRangeCount, + final @Parameter(TcpPortRangeTryCount.class) int portRangeTryCount) { + this.portRangeBegin = portRangeBegin; + this.portRangeCount = portRangeCount; + this.portRangeTryCount = portRangeTryCount; + LOG.log(Level.INFO, "Instantiating " + this.toString()); + } + + /** + * returns a configuration for the class that implements TcpPortProvider so that class can be instantiated + * somewhere else + * + * @return Configuration. + */ + @Override + public Configuration getConfiguration() { + return Tang.Factory.getTang().newConfigurationBuilder() + .bindNamedParameter(TcpPortRangeBegin.class, String.valueOf(portRangeBegin)) + .bindNamedParameter(TcpPortRangeCount.class, String.valueOf(portRangeCount)) + .bindNamedParameter(TcpPortRangeTryCount.class, String.valueOf(portRangeTryCount)) + .bindSetEntry(EvaluatorConfigurationProviders.class, TcpPortConfigurationProvider.class) + .build(); + } + + @Override + public String toString() { + return "TcpPortConfigurationProvider{" + + "portRangeBegin=" + portRangeBegin + + ", portRangeCount=" + portRangeCount + + ", portRangeTryCount=" + portRangeTryCount + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b9686cd7/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java index 6fdd0b5..c541c73 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java @@ -30,6 +30,8 @@ import org.apache.reef.wake.remote.Codec; import org.apache.reef.wake.remote.address.LocalAddressProvider; import org.apache.reef.wake.remote.address.LocalAddressProviderFactory; import org.apache.reef.wake.remote.impl.TransportEvent; +import org.apache.reef.wake.remote.ports.RangeTcpPortProvider; +import org.apache.reef.wake.remote.ports.TcpPortProvider; import org.apache.reef.wake.remote.transport.Transport; import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport; import org.apache.reef.webserver.AvroReefServiceInfo; @@ -98,7 +100,7 @@ public class NameServerImpl implements NameServer { final int port, final IdentifierFactory factory, final ReefEventStateManager reefEventStateManager) { - this(port, factory, reefEventStateManager, LocalAddressProviderFactory.getInstance()); + this(port, factory, reefEventStateManager, LocalAddressProviderFactory.getInstance(), RangeTcpPortProvider.Default); } /** @@ -113,7 +115,8 @@ public class NameServerImpl implements NameServer { final @Parameter(NameServerParameters.NameServerPort.class) int port, final @Parameter(NameServerParameters.NameServerIdentifierFactory.class) IdentifierFactory factory, final ReefEventStateManager reefEventStateManager, - final LocalAddressProvider localAddressProvider) { + final LocalAddressProvider localAddressProvider, + final TcpPortProvider tcpPortProvider) { this.localAddressProvider = localAddressProvider; @@ -122,7 +125,7 @@ public class NameServerImpl implements NameServer { final EventHandler<NamingMessage> handler = createEventHandler(codec); this.transport = new NettyMessagingTransport(localAddressProvider.getLocalAddress(), port, null, - new SyncStage<>(new NamingServerHandler(handler, codec)), 3, 10000); + new SyncStage<>(new NamingServerHandler(handler, codec)), 3, 10000, tcpPortProvider); this.port = transport.getListeningPort(); this.idToAddrMap = Collections.synchronizedMap(new HashMap<Identifier, InetSocketAddress>()); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b9686cd7/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalRuntimeConfiguration.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalRuntimeConfiguration.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalRuntimeConfiguration.java index accb6b0..9a882cf 100644 --- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalRuntimeConfiguration.java +++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalRuntimeConfiguration.java @@ -19,7 +19,6 @@ package org.apache.reef.runtime.local.client; import org.apache.reef.client.parameters.DriverConfigurationProviders; -import org.apache.reef.tang.ConfigurationProvider; import org.apache.reef.runtime.common.client.CommonRuntimeConfiguration; import org.apache.reef.runtime.common.client.api.JobSubmissionHandler; import org.apache.reef.runtime.common.files.RuntimeClasspathProvider; @@ -27,12 +26,12 @@ import org.apache.reef.runtime.common.parameters.JVMHeapSlack; import org.apache.reef.runtime.local.LocalClasspathProvider; import org.apache.reef.runtime.local.client.parameters.MaxNumberOfEvaluators; import org.apache.reef.runtime.local.client.parameters.RootFolder; +import org.apache.reef.tang.ConfigurationProvider; import org.apache.reef.tang.formats.ConfigurationModule; import org.apache.reef.tang.formats.ConfigurationModuleBuilder; import org.apache.reef.tang.formats.OptionalImpl; import org.apache.reef.tang.formats.OptionalParameter; import org.apache.reef.wake.remote.address.LocalAddressProvider; -import org.apache.reef.wake.remote.ports.TcpPortProvider; import java.util.concurrent.ExecutorService; @@ -67,19 +66,8 @@ public class LocalRuntimeConfiguration extends ConfigurationModuleBuilder { */ public static final OptionalImpl<ConfigurationProvider> DRIVER_CONFIGURATION_PROVIDERS = new OptionalImpl<>(); - /** - * The class used to resolve the local address for Wake and HTTP to bind to. - * Note that you will likely want to bind the same class also to DRIVER_CONFIGURATION_PROVIDERS to make sure that - * the Driver (and the Evaluators) also use it. - */ - public static final OptionalImpl<LocalAddressProvider> LOCAL_ADDRESS_PROVIDER = new OptionalImpl<>(); - /** - * The class used to restrict tcp port ranges for listening - * Note that you will likely want to bind the same class also to DRIVER_CONFIGURATION_PROVIDERS to make sure that - * the Driver (and the Evaluators) also use it. - */ - public static final OptionalImpl<TcpPortProvider> TCP_PORT_PROVIDER = new OptionalImpl<>(); + /** * The ConfigurationModule for the local resourcemanager. @@ -95,9 +83,6 @@ public class LocalRuntimeConfiguration extends ConfigurationModuleBuilder { .bindNamedParameter(RootFolder.class, RUNTIME_ROOT_FOLDER) .bindNamedParameter(JVMHeapSlack.class, JVM_HEAP_SLACK) .bindSetEntry(DriverConfigurationProviders.class, DRIVER_CONFIGURATION_PROVIDERS) - // Bind LocalAddressProvider - .bindImplementation(LocalAddressProvider.class, LOCAL_ADDRESS_PROVIDER) - .bindImplementation(TcpPortProvider.class, TCP_PORT_PROVIDER) .build(); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b9686cd7/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnClientConfiguration.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnClientConfiguration.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnClientConfiguration.java index 115484b..15f7435 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnClientConfiguration.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnClientConfiguration.java @@ -21,7 +21,6 @@ package org.apache.reef.runtime.yarn.client; import org.apache.reef.annotations.audience.ClientSide; import org.apache.reef.annotations.audience.Public; import org.apache.reef.client.parameters.DriverConfigurationProviders; -import org.apache.reef.tang.ConfigurationProvider; import org.apache.reef.runtime.common.client.CommonRuntimeConfiguration; import org.apache.reef.runtime.common.client.api.JobSubmissionHandler; import org.apache.reef.runtime.common.files.RuntimeClasspathProvider; @@ -30,13 +29,13 @@ import org.apache.reef.runtime.yarn.YarnClasspathProvider; import org.apache.reef.runtime.yarn.client.parameters.JobPriority; import org.apache.reef.runtime.yarn.client.parameters.JobQueue; import org.apache.reef.runtime.yarn.util.YarnConfigurationConstructor; +import org.apache.reef.tang.ConfigurationProvider; import org.apache.reef.tang.formats.ConfigurationModule; import org.apache.reef.tang.formats.ConfigurationModuleBuilder; import org.apache.reef.tang.formats.OptionalImpl; import org.apache.reef.tang.formats.OptionalParameter; import org.apache.reef.util.logging.LoggingSetup; import org.apache.reef.wake.remote.address.LocalAddressProvider; -import org.apache.reef.wake.remote.ports.TcpPortProvider; /** * A ConfigurationModule for the YARN resourcemanager. @@ -58,20 +57,6 @@ public class YarnClientConfiguration extends ConfigurationModuleBuilder { */ public static final OptionalImpl<ConfigurationProvider> DRIVER_CONFIGURATION_PROVIDERS = new OptionalImpl<>(); - /** - * The class used to restrict tcp port ranges for listening - * Note that you will likely want to bind the same class also to DRIVER_CONFIGURATION_PROVIDERS to make sure that - * the Driver (and the Evaluators) also use it. - */ - public static final OptionalImpl<TcpPortProvider> TCP_PORT_PROVIDER = new OptionalImpl<>(); - - /** - * The class used to resolve the local address for Wake and HTTP to bind to. - * Note that you will likely want to bind the same class also to DRIVER_CONFIGURATION_PROVIDERS to make sure that - * the Driver (and the Evaluators) also use it. - */ - public static final OptionalImpl<LocalAddressProvider> LOCAL_ADDRESS_PROVIDER = new OptionalImpl<>(); - public static final ConfigurationModule CONF = new YarnClientConfiguration() .merge(CommonRuntimeConfiguration.CONF) // Bind YARN @@ -84,9 +69,6 @@ public class YarnClientConfiguration extends ConfigurationModuleBuilder { // Bind external constructors. Taken from YarnExternalConstructors.registerClientConstructors .bindConstructor(org.apache.hadoop.yarn.conf.YarnConfiguration.class, YarnConfigurationConstructor.class) .bindSetEntry(DriverConfigurationProviders.class, DRIVER_CONFIGURATION_PROVIDERS) - // Bind LocalAddressProvider - .bindImplementation(LocalAddressProvider.class, LOCAL_ADDRESS_PROVIDER) - .bindImplementation(TcpPortProvider.class, TCP_PORT_PROVIDER) .build(); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b9686cd7/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/RangeTcpPortProvider.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/RangeTcpPortProvider.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/RangeTcpPortProvider.java index 64b8de4..246270b 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/RangeTcpPortProvider.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/RangeTcpPortProvider.java @@ -29,12 +29,13 @@ import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeTryCount; import javax.inject.Inject; import java.util.Iterator; +import java.util.logging.Level; import java.util.logging.Logger; /** * A TcpPortProvider which gives out random ports in a range */ -public final class RangeTcpPortProvider implements TcpPortProvider, ConfigurationProvider { +public final class RangeTcpPortProvider implements TcpPortProvider { private final int portRangeBegin; private final int portRangeCount; private final int portRangeTryCount; @@ -47,6 +48,7 @@ public final class RangeTcpPortProvider implements TcpPortProvider, Configuratio this.portRangeBegin = portRangeBegin; this.portRangeCount = portRangeCount; this.portRangeTryCount = portRangeTryCount; + LOG.log(Level.FINE, "Instantiating " + this); } /** @@ -68,17 +70,6 @@ public final class RangeTcpPortProvider implements TcpPortProvider, Configuratio Integer.parseInt(TcpPortRangeCount.default_value), Integer.parseInt(TcpPortRangeTryCount.default_value)); - - @Override - public Configuration getConfiguration() { - return Tang.Factory.getTang().newConfigurationBuilder() - .bindNamedParameter(TcpPortRangeBegin.class, String.valueOf(portRangeBegin)) - .bindNamedParameter(TcpPortRangeCount.class, String.valueOf(portRangeCount)) - .bindNamedParameter(TcpPortRangeTryCount.class, String.valueOf(portRangeTryCount)) - .bindImplementation(TcpPortProvider.class, RangeTcpPortProvider.class) - .build(); - } - @Override public String toString() { return "RangeTcpPortProvider{" + http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/b9686cd7/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/TcpPortProvider.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/TcpPortProvider.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/TcpPortProvider.java index 566b62b..1b4803c 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/TcpPortProvider.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/TcpPortProvider.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -35,12 +35,4 @@ public interface TcpPortProvider extends Iterable<Integer> { */ @Override Iterator<Integer> iterator(); - - /** - * returns a configuration for the class that implements TcpPortProvider so that class can be instantiated - * somewhere else - * - * @return Configuration. - */ - Configuration getConfiguration(); }
