Repository: reef Updated Branches: refs/heads/master 35dc55eb6 -> e18b5d36a
[REEF-1895] REEF Bridge performance improvement for allocated evaluators * Remove synchronize in JobDriver for allocated evaluator/context handlers * Remove match code in AllocatedEvaluator in bridge as it is not used * Bridge code change to improve code runtime reuse * Reduce the cross bridge calls in AllocatedEvaluatorClr2Java * Adding performance test JIRA: [REEF-1895](https://issues.apache.org/jira/browse/REEF-1895) Pull Request: This closes #1385 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/e18b5d36 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/e18b5d36 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/e18b5d36 Branch: refs/heads/master Commit: e18b5d36ad714a877e2806f1c48f56401d6132da Parents: 35dc55e Author: Julia Wang <[email protected]> Authored: Mon Oct 9 20:19:31 2017 -0700 Committer: Markus Weimer <[email protected]> Committed: Tue Oct 17 15:34:42 2017 -0700 ---------------------------------------------------------------------- .../AllocatedEvaluatorClr2Java.cpp | 29 +-- lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h | 6 +- lang/cs/Org.Apache.REEF.Bridge/InteropUtil.cpp | 13 +- .../cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp | 6 +- .../Context/ContextConfiguration.cs | 37 +++- .../Tasks/TaskConfiguration.cs | 41 +++-- .../Bridge/ClrSystemHandlerWrapper.cs | 4 +- .../Bridge/DriverBridge.cs | 23 ++- .../Bridge/Events/AllocatedEvaluator.cs | 43 +---- .../Org.Apache.REEF.Tests.csproj | 3 + .../TestHelloREEF/TestHelloDriver.cs | 158 ++++++++++++++++ .../TestHelloREEF/TestHelloREEFClient.cs | 181 +++++++++++++++++++ .../Performance/TestHelloREEF/TestHelloTask.cs | 45 +++++ .../Org.Apache.REEF.Utilities/Logging/Logger.cs | 4 +- .../javabridge/AllocatedEvaluatorBridge.java | 8 + .../apache/reef/javabridge/NativeInterop.java | 4 +- .../reef/javabridge/generic/JobDriver.java | 91 +++++----- .../yarn/client/YarnSubmissionHelper.java | 3 +- 18 files changed, 550 insertions(+), 149 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/e18b5d36/lang/cs/Org.Apache.REEF.Bridge/AllocatedEvaluatorClr2Java.cpp ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Bridge/AllocatedEvaluatorClr2Java.cpp b/lang/cs/Org.Apache.REEF.Bridge/AllocatedEvaluatorClr2Java.cpp index 387888c..af89bea 100644 --- a/lang/cs/Org.Apache.REEF.Bridge/AllocatedEvaluatorClr2Java.cpp +++ b/lang/cs/Org.Apache.REEF.Bridge/AllocatedEvaluatorClr2Java.cpp @@ -30,7 +30,7 @@ namespace Org { static BridgeLogger^ LOGGER = BridgeLogger::GetLogger("<C++>"); }; - AllocatedEvaluatorClr2Java::AllocatedEvaluatorClr2Java(JNIEnv *env, jobject jallocatedEvaluator) { + AllocatedEvaluatorClr2Java::AllocatedEvaluatorClr2Java(JNIEnv *env, jobject jallocatedEvaluator, jstring nameServerInfo, jstring evaluatorId) { ManagedLog::LOGGER->LogStart("AllocatedEvaluatorClr2Java::AllocatedEvaluatorClr2Java"); @@ -40,11 +40,8 @@ namespace Org { } _jobjectAllocatedEvaluator = reinterpret_cast<jobject>(env->NewGlobalRef(jallocatedEvaluator)); - jclass jclassAllocatedEvaluator = env->GetObjectClass(_jobjectAllocatedEvaluator); - _jstringId = CommonUtilities::GetJObjectId(env, _jobjectAllocatedEvaluator, jclassAllocatedEvaluator); - - jmethodID jmidGetNameServerInfo = env->GetMethodID(jclassAllocatedEvaluator, "getNameServerInfo", "()Ljava/lang/String;"); - _jstringNameServerInfo = CommonUtilities::CallGetMethodNewGlobalRef<jstring>(env, _jobjectAllocatedEvaluator, jmidGetNameServerInfo); + _evaluatorId = ManagedStringFromJavaString(env, evaluatorId); + _nameServerInfo = ManagedStringFromJavaString(env, nameServerInfo); ManagedLog::LOGGER->LogStop("AllocatedEvaluatorClr2Java::AllocatedEvaluatorClr2Java"); } @@ -58,14 +55,6 @@ namespace Org { if (_jobjectAllocatedEvaluator != NULL) { env->DeleteGlobalRef(_jobjectAllocatedEvaluator); } - - if (_jstringId != NULL) { - env->DeleteGlobalRef(_jstringId); - } - - if (_jstringNameServerInfo != NULL) { - env->DeleteGlobalRef(_jstringNameServerInfo); - } } void AllocatedEvaluatorClr2Java::SubmitContext(String^ evaluatorConfigStr, String^ contextConfigStr) { @@ -87,7 +76,7 @@ namespace Org { } void AllocatedEvaluatorClr2Java::SubmitContextAndTask(String^ evaluatorConfigStr, String^ contextConfigStr, String^ taskConfigStr) { - ManagedLog::LOGGER->LogStart("AllocatedEvaluatorClr2Java::SubmitContextAndTask"); + ManagedLog::LOGGER->LogStart("AllocatedEvaluatorClr2Java::SubmitContextAndTask" + taskConfigStr); JNIEnv *env = RetrieveEnv(_jvm); jclass jclassAllocatedEvaluator = env->GetObjectClass(_jobjectAllocatedEvaluator); jmethodID jmidSubmitContextAndTask = env->GetMethodID(jclassAllocatedEvaluator, "submitContextAndTaskString", "(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)V"); @@ -102,7 +91,7 @@ namespace Org { JavaStringFromManagedString(env, evaluatorConfigStr), JavaStringFromManagedString(env, contextConfigStr), JavaStringFromManagedString(env, taskConfigStr)); - ManagedLog::LOGGER->LogStop("AllocatedEvaluatorClr2Java::SubmitContextAndTask"); + ManagedLog::LOGGER->LogStop("AllocatedEvaluatorClr2Java::SubmitContextAndTask" + taskConfigStr); } void AllocatedEvaluatorClr2Java::SubmitContextAndService(String^ evaluatorConfigStr, String^ contextConfigStr, String^ serviceConfigStr) { @@ -166,15 +155,11 @@ namespace Org { } String^ AllocatedEvaluatorClr2Java::GetId() { - ManagedLog::LOGGER->Log("AllocatedEvaluatorClr2Java::GetId"); - JNIEnv *env = RetrieveEnv(_jvm); - return ManagedStringFromJavaString(env, _jstringId); + return _evaluatorId; } String^ AllocatedEvaluatorClr2Java::GetNameServerInfo() { - ManagedLog::LOGGER->Log("AllocatedEvaluatorClr2Java::GetNameServerInfo"); - JNIEnv *env = RetrieveEnv(_jvm); - return ManagedStringFromJavaString(env, _jstringNameServerInfo); + return _nameServerInfo; } IEvaluatorDescriptor^ AllocatedEvaluatorClr2Java::GetEvaluatorDescriptor() { http://git-wip-us.apache.org/repos/asf/reef/blob/e18b5d36/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h b/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h index cf4b947..5fa16e7 100644 --- a/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h +++ b/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h @@ -56,10 +56,10 @@ namespace Org { public ref class AllocatedEvaluatorClr2Java : public IAllocatedEvaluatorClr2Java { jobject _jobjectAllocatedEvaluator = NULL; JavaVM* _jvm; - jstring _jstringId = NULL; - jstring _jstringNameServerInfo = NULL; + String^ _evaluatorId; + String^ _nameServerInfo; public: - AllocatedEvaluatorClr2Java(JNIEnv *env, jobject jallocatedEvaluator); + AllocatedEvaluatorClr2Java(JNIEnv *env, jobject jallocatedEvaluator, jstring nameServerInfo, jstring evaluatorId); ~AllocatedEvaluatorClr2Java(); !AllocatedEvaluatorClr2Java(); virtual void SubmitContextAndTask(String^ evaluatorConfigStr, String^ contextConfigStr, String^ taskConfigStr); http://git-wip-us.apache.org/repos/asf/reef/blob/e18b5d36/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.cpp ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.cpp b/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.cpp index 4402c44..9618485 100644 --- a/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.cpp +++ b/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.cpp @@ -135,13 +135,16 @@ jbyteArray JavaByteArrayFromManagedByteArray( return NULL; } +thread_local JNIEnv *t_env = NULL; JNIEnv* RetrieveEnv(JavaVM* jvm) { - JNIEnv *env; - if (jvm->AttachCurrentThread((void **) &env, NULL) != 0) { - ManagedLog::LOGGER->Log("cannot attach jni env to current jvm thread."); - throw; + if (NULL == t_env) + { + if (jvm->AttachCurrentThread((void **)&t_env, NULL) != 0) { + ManagedLog::LOGGER->Log("cannot attach jni env to current jvm thread."); + throw; + } } - return env; + return t_env; } String^ FormatJavaExceptionMessage(String^ errorMessage, Exception^ exception, int recursionDepth) { http://git-wip-us.apache.org/repos/asf/reef/blob/e18b5d36/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp b/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp index 25a4c77..40611f7 100644 --- a/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp +++ b/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp @@ -155,9 +155,9 @@ JNIEXPORT void JNICALL Java_org_apache_reef_javabridge_NativeInterop_callClrSyst * Signature: (JLorg/apache/reef/javabridge/AllocatedEvaluatorBridge;Lorg/apache/reef/javabridge/InteropLogger;)V */ JNIEXPORT void JNICALL Java_org_apache_reef_javabridge_NativeInterop_clrSystemAllocatedEvaluatorHandlerOnNext -(JNIEnv *env, jclass cls, jlong handle, jobject jallocatedEvaluatorBridge, jobject jlogger) { +(JNIEnv *env, jclass cls, jlong handle, jobject jallocatedEvaluatorBridge, jobject jlogger, jstring nameServerInfo, jstring evaluatorId) { ManagedLog::LOGGER->Log("+Java_org_apache_reef_javabridge_NativeInterop_clrSystemAllocatedEvaluatorHandlerOnNext:"); - AllocatedEvaluatorClr2Java^ allocatedEval = gcnew AllocatedEvaluatorClr2Java(env, jallocatedEvaluatorBridge); + AllocatedEvaluatorClr2Java^ allocatedEval = gcnew AllocatedEvaluatorClr2Java(env, jallocatedEvaluatorBridge, nameServerInfo, evaluatorId); try { ClrSystemHandlerWrapper::Call_ClrSystemAllocatedEvaluatorHandler_OnNext(handle, allocatedEval); } @@ -600,7 +600,7 @@ static JNINativeMethod methods[] = { { "callClrSystemOnStartHandler", "()V", (void*)&Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnStartHandler }, - { "clrSystemAllocatedEvaluatorHandlerOnNext", "(JLorg/apache/reef/javabridge/AllocatedEvaluatorBridge;Lorg/apache/reef/javabridge/InteropLogger;)V", + { "clrSystemAllocatedEvaluatorHandlerOnNext", "(JLorg/apache/reef/javabridge/AllocatedEvaluatorBridge;Lorg/apache/reef/javabridge/InteropLogger;Ljava/lang/String;Ljava/lang/String;)V", (void*)&Java_org_apache_reef_javabridge_NativeInterop_clrSystemAllocatedEvaluatorHandlerOnNext }, { "clrSystemActiveContextHandlerOnNext", "(JLorg/apache/reef/javabridge/ActiveContextBridge;Lorg/apache/reef/javabridge/InteropLogger;)V", http://git-wip-us.apache.org/repos/asf/reef/blob/e18b5d36/lang/cs/Org.Apache.REEF.Common/Context/ContextConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Context/ContextConfiguration.cs b/lang/cs/Org.Apache.REEF.Common/Context/ContextConfiguration.cs index 115dc4a..721f287 100644 --- a/lang/cs/Org.Apache.REEF.Common/Context/ContextConfiguration.cs +++ b/lang/cs/Org.Apache.REEF.Common/Context/ContextConfiguration.cs @@ -17,6 +17,7 @@ using System; using System.Diagnostics.CodeAnalysis; +using System.Net.NetworkInformation; using Org.Apache.REEF.Common.Events; using Org.Apache.REEF.Common.Tasks; using Org.Apache.REEF.Common.Tasks.Events; @@ -73,19 +74,37 @@ namespace Org.Apache.REEF.Common.Context [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] public static readonly OptionalImpl<IContextMessageHandler> OnMessage = new OptionalImpl<IContextMessageHandler>(); + private static ConfigurationModule contextConfig; + + private static readonly object ConfigLock = new object(); + public static ConfigurationModule ConfigurationModule { get { - return new ContextConfiguration() - .BindNamedParameter(GenericType<ContextConfigurationOptions.ContextIdentifier>.Class, Identifier) - .BindSetEntry(GenericType<ContextConfigurationOptions.StartHandlers>.Class, OnContextStart) - .BindSetEntry(GenericType<ContextConfigurationOptions.StopHandlers>.Class, OnContextStop) - .BindSetEntry(GenericType<ContextConfigurationOptions.ContextMessageSources>.Class, OnSendMessage) - .BindSetEntry(GenericType<ContextConfigurationOptions.ContextMessageHandlers>.Class, OnMessage) - .BindSetEntry(GenericType<TaskConfigurationOptions.StartHandlers>.Class, OnTaskStart) - .BindSetEntry(GenericType<TaskConfigurationOptions.StopHandlers>.Class, OnTaskStop) - .Build(); + if (contextConfig == null) + { + lock (ConfigLock) + { + if (contextConfig == null) + { + contextConfig = new ContextConfiguration() + .BindNamedParameter(GenericType<ContextConfigurationOptions.ContextIdentifier>.Class, + Identifier) + .BindSetEntry(GenericType<ContextConfigurationOptions.StartHandlers>.Class, + OnContextStart) + .BindSetEntry(GenericType<ContextConfigurationOptions.StopHandlers>.Class, OnContextStop) + .BindSetEntry(GenericType<ContextConfigurationOptions.ContextMessageSources>.Class, + OnSendMessage) + .BindSetEntry(GenericType<ContextConfigurationOptions.ContextMessageHandlers>.Class, + OnMessage) + .BindSetEntry(GenericType<TaskConfigurationOptions.StartHandlers>.Class, OnTaskStart) + .BindSetEntry(GenericType<TaskConfigurationOptions.StopHandlers>.Class, OnTaskStop) + .Build(); + } + } + } + return contextConfig; } } } http://git-wip-us.apache.org/repos/asf/reef/blob/e18b5d36/lang/cs/Org.Apache.REEF.Common/Tasks/TaskConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Tasks/TaskConfiguration.cs b/lang/cs/Org.Apache.REEF.Common/Tasks/TaskConfiguration.cs index 34c683b..8f9ae2d 100644 --- a/lang/cs/Org.Apache.REEF.Common/Tasks/TaskConfiguration.cs +++ b/lang/cs/Org.Apache.REEF.Common/Tasks/TaskConfiguration.cs @@ -87,22 +87,39 @@ namespace Org.Apache.REEF.Common.Tasks [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] public static readonly OptionalParameter<string> Memento = new OptionalParameter<string>(); + private static ConfigurationModule taskConfig; + + private static readonly object ConfigLock = new object(); + public static ConfigurationModule ConfigurationModule { get { - return new TaskConfiguration() - .BindImplementation(GenericType<ITask>.Class, Task) - .BindSetEntry(GenericType<TaskConfigurationOptions.TaskMessageSources>.Class, OnSendMessage) - .BindImplementation(GenericType<IDriverMessageHandler>.Class, OnMessage) - .BindImplementation(GenericType<IDriverConnectionMessageHandler>.Class, OnDriverConnectionChanged) - .BindNamedParameter(GenericType<TaskConfigurationOptions.Identifier>.Class, Identifier) - .BindNamedParameter(GenericType<TaskConfigurationOptions.Memento>.Class, Memento) - .BindNamedParameter(GenericType<TaskConfigurationOptions.CloseHandler>.Class, OnClose) - .BindNamedParameter(GenericType<TaskConfigurationOptions.SuspendHandler>.Class, OnSuspend) - .BindSetEntry(GenericType<TaskConfigurationOptions.StartHandlers>.Class, OnTaskStart) - .BindSetEntry(GenericType<TaskConfigurationOptions.StopHandlers>.Class, OnTaskStop) - .Build(); + if (taskConfig == null) + { + lock (ConfigLock) + { + if (taskConfig == null) + { + taskConfig = new TaskConfiguration() + .BindImplementation(GenericType<ITask>.Class, Task) + .BindSetEntry(GenericType<TaskConfigurationOptions.TaskMessageSources>.Class, + OnSendMessage) + .BindImplementation(GenericType<IDriverMessageHandler>.Class, OnMessage) + .BindImplementation(GenericType<IDriverConnectionMessageHandler>.Class, + OnDriverConnectionChanged) + .BindNamedParameter(GenericType<TaskConfigurationOptions.Identifier>.Class, Identifier) + .BindNamedParameter(GenericType<TaskConfigurationOptions.Memento>.Class, Memento) + .BindNamedParameter(GenericType<TaskConfigurationOptions.CloseHandler>.Class, OnClose) + .BindNamedParameter(GenericType<TaskConfigurationOptions.SuspendHandler>.Class, + OnSuspend) + .BindSetEntry(GenericType<TaskConfigurationOptions.StartHandlers>.Class, OnTaskStart) + .BindSetEntry(GenericType<TaskConfigurationOptions.StopHandlers>.Class, OnTaskStop) + .Build(); + } + } + } + return taskConfig; } } } http://git-wip-us.apache.org/repos/asf/reef/blob/e18b5d36/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs index 0895aa0..b203b8a 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs @@ -40,11 +40,11 @@ namespace Org.Apache.REEF.Driver.Bridge public static void Call_ClrSystemAllocatedEvaluatorHandler_OnNext(ulong handle, IAllocatedEvaluatorClr2Java clr2Java) { - using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemAllocatedEvaluatorHandler_OnNext")) + using (LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemAllocatedEvaluatorHandler_OnNext", clr2Java.GetId())) { GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle); ClrSystemHandler<IAllocatedEvaluator> obj = (ClrSystemHandler<IAllocatedEvaluator>)gc.Target; - obj.OnNext(new AllocatedEvaluator(clr2Java, _driverBridge.ConfigurationProviders)); + obj.OnNext(new AllocatedEvaluator(clr2Java, _driverBridge.ConfigurationStringForProviders)); } } http://git-wip-us.apache.org/repos/asf/reef/blob/e18b5d36/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridge.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridge.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridge.cs index dd30462..8d2bf29 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridge.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridge.cs @@ -19,6 +19,7 @@ using System; using System.Collections.Generic; using System.Diagnostics; using System.Globalization; +using System.Linq; using Org.Apache.REEF.Common.Context; using Org.Apache.REEF.Common.Evaluator; using Org.Apache.REEF.Common.Evaluator.DriverConnectionConfigurationProviders; @@ -32,6 +33,8 @@ using Org.Apache.REEF.Common.Evaluator.Parameters; using Org.Apache.REEF.Driver.Bridge.Clr2java; using Org.Apache.REEF.Driver.Bridge.Events; using Org.Apache.REEF.Driver.Defaults; +using Org.Apache.REEF.Tang.Formats; +using Org.Apache.REEF.Tang.Implementations.Configuration; using Org.Apache.REEF.Tang.Implementations.InjectionPlan; using Org.Apache.REEF.Tang.Implementations.Tang; @@ -117,7 +120,7 @@ namespace Org.Apache.REEF.Driver.Bridge private readonly HttpServerHandler _httpServerHandler; - private readonly ISet<IConfigurationProvider> _configurationProviders; + private readonly string _configurationProviderString; private readonly IProgressProvider _progressProvider; @@ -148,7 +151,8 @@ namespace Org.Apache.REEF.Driver.Bridge IDriverReconnConfigProvider driverReconnConfigProvider, IDriverConnection driverConnection, HttpServerHandler httpServerHandler, - IProgressProvider progressProvider) + IProgressProvider progressProvider, + AvroConfigurationSerializer serializer) { foreach (TraceListener listener in traceListeners) { @@ -188,10 +192,10 @@ namespace Org.Apache.REEF.Driver.Bridge _driverRestartFailedEvaluatorHandlers = driverRestartFailedEvaluatorHandlers; _httpServerHandler = httpServerHandler; - _configurationProviders = new HashSet<IConfigurationProvider>(configurationProviders) { driverReconnConfigProvider }; + var configurationProviderSet = new HashSet<IConfigurationProvider>(configurationProviders) { driverReconnConfigProvider }; + _configurationProviderString = serializer.ToString(Configurations.Merge(configurationProviderSet.Select(x => x.GetConfiguration()).ToArray())); + _progressProvider = progressProvider; - _progressProvider = progressProvider; - _allocatedEvaluatorSubscriber = new ClrSystemHandler<IAllocatedEvaluator>(); _completedEvaluatorSubscriber = new ClrSystemHandler<ICompletedEvaluator>(); _taskMessageSubscriber = new ClrSystemHandler<ITaskMessage>(); @@ -384,9 +388,12 @@ namespace Org.Apache.REEF.Driver.Bridge } } - internal ISet<IConfigurationProvider> ConfigurationProviders - { - get { return _configurationProviders; } + /// <summary> + /// Serialized configuration string for configurations from configuration providers. + /// </summary> + internal string ConfigurationStringForProviders + { + get { return _configurationProviderString; } } } } http://git-wip-us.apache.org/repos/asf/reef/blob/e18b5d36/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/AllocatedEvaluator.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/AllocatedEvaluator.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/AllocatedEvaluator.cs index 174ec72..3e70d67 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/AllocatedEvaluator.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/AllocatedEvaluator.cs @@ -16,14 +16,12 @@ // under the License. using System; -using System.Collections.Generic; using System.Runtime.Serialization; using Org.Apache.REEF.Common.Catalog; using Org.Apache.REEF.Common.Evaluator; using Org.Apache.REEF.Driver.Bridge.Clr2java; using Org.Apache.REEF.Driver.Evaluator; using Org.Apache.REEF.Tang.Formats; -using Org.Apache.REEF.Tang.Implementations.Configuration; using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Utilities.Logging; @@ -31,7 +29,7 @@ using Org.Apache.REEF.Utilities.Logging; namespace Org.Apache.REEF.Driver.Bridge.Events { [DataContract] - internal class AllocatedEvaluator : IAllocatedEvaluator + internal sealed class AllocatedEvaluator : IAllocatedEvaluator { private static readonly Logger LOGGER = Logger.GetLogger(typeof(AllocatedEvaluator)); @@ -41,23 +39,19 @@ namespace Org.Apache.REEF.Driver.Bridge.Events private readonly string _evaluatorConfigStr; - public AllocatedEvaluator(IAllocatedEvaluatorClr2Java clr2Java, ISet<IConfigurationProvider> configurationProviders) + public AllocatedEvaluator(IAllocatedEvaluatorClr2Java clr2Java, string configuration) { - _serializer = new AvroConfigurationSerializer(); - - var evaluatorConfig = TangFactory.GetTang().NewConfigurationBuilder().Build(); - foreach (var configurationProvider in configurationProviders) + using (LOGGER.LogFunction("AllocatedEvaluator::AllocatedEvaluator:", clr2Java.GetId())) { - evaluatorConfig = Configurations.Merge(evaluatorConfig, configurationProvider.GetConfiguration()); - } - - _evaluatorConfigStr = _serializer.ToString(evaluatorConfig); + _serializer = TangFactory.GetTang().NewInjector().GetInstance<AvroConfigurationSerializer>(); + _evaluatorConfigStr = configuration; - Clr2Java = clr2Java; - Id = Clr2Java.GetId(); - ProcessNewEvaluator(); + Clr2Java = clr2Java; + Id = Clr2Java.GetId(); + ProcessNewEvaluator(); - NameServerInfo = Clr2Java.GetNameServerInfo(); + NameServerInfo = Clr2Java.GetNameServerInfo(); + } } public string Id { get; private set; } @@ -134,23 +128,6 @@ namespace Org.Apache.REEF.Driver.Bridge.Events private void ProcessNewEvaluator() { _evaluatorDescriptor = Clr2Java.GetEvaluatorDescriptor(); - lock (EvaluatorRequestor.Evaluators) - { - foreach (KeyValuePair<string, IEvaluatorDescriptor> pair in EvaluatorRequestor.Evaluators) - { - if (pair.Value.Equals(_evaluatorDescriptor)) - { - var key = pair.Key; - EvaluatorRequestor.Evaluators.Remove(key); - var assignedId = key.Substring(0, key.LastIndexOf(EvaluatorRequestor.BatchIdxSeparator)); - - LOGGER.Log(Level.Verbose, "Received evaluator [{0}] of memory {1}MB that matches request of {2}MB with batch id [{3}].", - Id, _evaluatorDescriptor.Memory, pair.Value.Memory, assignedId); - EvaluatorBatchId = assignedId; - break; - } - } - } } } } http://git-wip-us.apache.org/repos/asf/reef/blob/e18b5d36/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj index 252bf75..b83b705 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj +++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj @@ -165,6 +165,9 @@ under the License. <Compile Include="Functional\Telemetry\MetricsDriver.cs" /> <Compile Include="Functional\Telemetry\MetricsTask.cs" /> <Compile Include="Functional\Telemetry\TestMetricsMessage.cs" /> + <Compile Include="Performance\TestHelloREEF\TestHelloDriver.cs" /> + <Compile Include="Performance\TestHelloREEF\TestHelloREEFClient.cs" /> + <Compile Include="Performance\TestHelloREEF\TestHelloTask.cs" /> <Compile Include="Properties\AssemblyInfo.cs" /> <Compile Include="Utility\TestDriverConfigGenerator.cs" /> <Compile Include="Utility\TestExceptions.cs" /> http://git-wip-us.apache.org/repos/asf/reef/blob/e18b5d36/lang/cs/Org.Apache.REEF.Tests/Performance/TestHelloREEF/TestHelloDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Performance/TestHelloREEF/TestHelloDriver.cs b/lang/cs/Org.Apache.REEF.Tests/Performance/TestHelloREEF/TestHelloDriver.cs new file mode 100644 index 0000000..2609857 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Performance/TestHelloREEF/TestHelloDriver.cs @@ -0,0 +1,158 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Driver; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Driver.Task; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Tests.Performance.TestHelloREEF +{ + /// <summary> + /// The Driver for HelloREEF: It requests a single Evaluator and then submits the HelloTask to it. + /// </summary> + public sealed class TestHelloDriver : IObserver<IAllocatedEvaluator>, + IObserver<IDriverStarted>, + IObserver<IFailedEvaluator>, + IObserver<IFailedTask>, + IObserver<ICompletedTask>, + IObserver<IRunningTask> + { + private static readonly Logger Logger = Logger.GetLogger(typeof(TestHelloDriver)); + private readonly IEvaluatorRequestor _evaluatorRequestor; + + /// <summary> + /// Specify if the desired node names is relaxed + /// </summary> + private readonly bool _relaxLocality; + + private readonly int _numberOfContainers; + + /// <summary> + /// Constructor of the driver + /// </summary> + /// <param name="evaluatorRequestor">Evaluator Requestor</param> + /// <param name="relaxLocality">Relax indicator of evaluator node request</param> + /// <param name="numberOfContainers">Relax indicator of evaluator node request</param> + [Inject] + private TestHelloDriver(IEvaluatorRequestor evaluatorRequestor, + [Parameter(typeof(RelaxLocality))] bool relaxLocality, + [Parameter(typeof(NumberOfContainers))] int numberOfContainers) + { + Logger.Log(Level.Info, "HelloDriverYarn Driver: numberOfContainers: {0}.", numberOfContainers); + _evaluatorRequestor = evaluatorRequestor; + _relaxLocality = relaxLocality; + _numberOfContainers = numberOfContainers; + } + + /// <summary> + /// Submits the HelloTask to the Evaluator. + /// </summary> + /// <param name="allocatedEvaluator"></param> + public void OnNext(IAllocatedEvaluator allocatedEvaluator) + { + var msg = string.Format("Received allocatedEvaluator-HostName: {0}, id {1}", + allocatedEvaluator.GetEvaluatorDescriptor().NodeDescriptor.HostName, + allocatedEvaluator.Id); + using (Logger.LogFunction("IAllocatedEvaluator handler:", msg)) + { + var taskConfiguration = TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Identifier, "HelloTask-" + allocatedEvaluator.Id) + .Set(TaskConfiguration.Task, GenericType<TestHelloTask>.Class) + .Build(); + allocatedEvaluator.SubmitTask(taskConfiguration); + } + } + + /// <summary> + /// Called to start the user mode driver + /// </summary> + /// <param name="driverStarted"></param> + public void OnNext(IDriverStarted driverStarted) + { + Logger.Log(Level.Info, "Received IDriverStarted, numberOfContainers: {0}", _numberOfContainers); + + _evaluatorRequestor.Submit(_evaluatorRequestor.NewBuilder() + .SetMegabytes(64) + .SetNumber(_numberOfContainers) + .SetRelaxLocality(_relaxLocality) + .SetCores(1) + .Build()); + } + + /// <summary> + /// A simple ICompletedTask handler. + /// </summary> + /// <param name="value"></param> + void IObserver<ICompletedTask>.OnNext(ICompletedTask value) + { + Logger.Log(Level.Info, "Received ICompletedTask: {0} with evaluator id: {1}.", value.Id, value.ActiveContext.EvaluatorId); + value.ActiveContext.Dispose(); + } + + /// <summary> + /// A simple IFailedTask handler. + /// </summary> + /// <param name="value"></param> + void IObserver<IFailedTask>.OnNext(IFailedTask value) + { + Logger.Log(Level.Info, "Received IFailedTask: {0} with evaluator id: {1}.", value.Id, value.GetActiveContext().Value.EvaluatorId); + value.GetActiveContext().Value.Dispose(); + } + + /// <summary> + /// A simple IFailedEvaluator handler. + /// </summary> + /// <param name="value"></param> + void IObserver<IFailedEvaluator>.OnNext(IFailedEvaluator value) + { + Logger.Log(Level.Info, "Received IFailedEvaluator: {0}.", value.Id); + } + + /// <summary> + /// A simple IRunningTask handler. + /// </summary> + /// <param name="value"></param> + void IObserver<IRunningTask>.OnNext(IRunningTask value) + { + Logger.Log(Level.Info, "Received IRunningTask: {0} with evaluator id: {1}", value.Id, value.ActiveContext.EvaluatorId); + } + + public void OnError(Exception error) + { + throw error; + } + + public void OnCompleted() + { + } + } + + [NamedParameter(documentation: "RelaxLocality for specifying evaluator node names", shortName: "RelaxLocality", defaultValue: "true")] + internal class RelaxLocality : Name<bool> + { + } + + [NamedParameter(documentation: "NumberOfContainers", defaultValue: "1")] + internal class NumberOfContainers : Name<int> + { + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/e18b5d36/lang/cs/Org.Apache.REEF.Tests/Performance/TestHelloREEF/TestHelloREEFClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Performance/TestHelloREEF/TestHelloREEFClient.cs b/lang/cs/Org.Apache.REEF.Tests/Performance/TestHelloREEF/TestHelloREEFClient.cs new file mode 100644 index 0000000..454cd32 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Performance/TestHelloREEF/TestHelloREEFClient.cs @@ -0,0 +1,181 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.Text; +using System.Threading; +using Newtonsoft.Json; +using Org.Apache.REEF.Client.API; +using Org.Apache.REEF.Client.Avro.YARN; +using Org.Apache.REEF.Client.Common; +using Org.Apache.REEF.Client.Local; +using Org.Apache.REEF.Client.Yarn; +using Org.Apache.REEF.Client.YARN.RestClient.DataModel; +using Org.Apache.REEF.Driver; +using Org.Apache.REEF.Tang.Implementations.Configuration; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Tests.Functional; +using Org.Apache.REEF.Utilities; +using Org.Apache.REEF.Utilities.Logging; +using Xunit; + +namespace Org.Apache.REEF.Tests.Performance.TestHelloREEF +{ + /// <summary> + /// Test Hello REEF for scalability + /// </summary> + [Collection("PerformanceTests")] + public class TestHelloREEFClient : ReefFunctionalTest + { + private static readonly Logger Logger = Logger.GetLogger(typeof(TestHelloREEFClient)); + + private const int ReTryCounts = 300; + private const int SleepTime = 2000; + private const string DefaultPortRangeStart = "2000"; + private const string DefaultPortRangeCount = "20"; + private const string TrustedApplicationTokenIdentifier = "TrustedApplicationTokenIdentifier"; + + /// <summary> + /// Test HelloREEF on local runtime. + /// </summary> + [Fact] + [Trait("Priority", "1")] + [Trait("Category", "FunctionalGated")] + [Trait("Description", "Test Hello Handler on local runtime")] + public void TestHelloREEFOnLocal() + { + int numberOfContainers = 5; + int driverMemory = 1024; + string testFolder = DefaultRuntimeFolder + TestId; + TestRun(GetRuntimeConfigurationForLocal(numberOfContainers, testFolder), driverMemory); + CleanUp(testFolder); + } + + /// <summary> + /// Test HelloREEF on YARN. + /// The parameter are provided on command line arguments: token password numberOfContainers + /// e.g. TestDriver.exe TrustedApplication001 none 2000 + /// </summary> + /// <param name="args"></param> + [Fact] + [Trait("Environment", "Yarn")] + [Trait("Priority", "1")] + [Trait("Description", "Run CLR Test on Yarn")] + public void TestHelloREEFOnYarn(string[] args) + { + TestRun(GetRuntimeConfigurationForYarn(args), 10240); + } + + /// <summary> + /// Test run for the runtime in the given injector. + /// </summary> + /// <param name="config">runtime configuration.</param> + /// <param name="driverMemory">driver memory in MB.</param> + private void TestRun(IConfiguration config, int driverMemory) + { + IInjector injector = TangFactory.GetTang().NewInjector(config); + var jobRequestBuilder = injector.GetInstance<JobRequestBuilder>(); + var reefClient = injector.GetInstance<IREEFClient>(); + var numberOfContainers = injector.GetNamedInstance<NumberOfContainers, int>(GenericType<NumberOfContainers>.Class); + + //// The driver configuration contains all the needed handler bindings + var helloDriverConfiguration = DriverConfiguration.ConfigurationModule + .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<TestHelloDriver>.Class) + .Set(DriverConfiguration.OnDriverStarted, GenericType<TestHelloDriver>.Class) + .Set(DriverConfiguration.OnTaskCompleted, GenericType<TestHelloDriver>.Class) + .Set(DriverConfiguration.OnTaskFailed, GenericType<TestHelloDriver>.Class) + .Set(DriverConfiguration.OnEvaluatorFailed, GenericType<TestHelloDriver>.Class) + .Set(DriverConfiguration.OnTaskRunning, GenericType<TestHelloDriver>.Class) + .Set(DriverConfiguration.CustomTraceLevel, Level.Info.ToString()) + .Build(); + + var driverConfig = TangFactory.GetTang() + .NewConfigurationBuilder(helloDriverConfiguration) + .BindIntNamedParam<NumberOfContainers>(numberOfContainers.ToString()); + + // The JobSubmission contains the Driver configuration as well as the files needed on the Driver. + var helloJobRequest = jobRequestBuilder + .AddDriverConfiguration(driverConfig.Build()) + .AddGlobalAssemblyForType(typeof(TestHelloDriver)) + .SetJobIdentifier("TestHelloREEF") + .SetDriverMemory(driverMemory) + .Build(); + + var result = reefClient.SubmitAndGetJobStatus(helloJobRequest); + var state = PullFinalJobStatus(result); + Logger.Log(Level.Info, "Application final state : {0}.", state); + Assert.Equal(FinalState.SUCCEEDED, state); + } + + /// <summary> + /// Get runtime configuration + /// </summary> + private static IConfiguration GetRuntimeConfigurationForYarn(string[] args) + { + var token = new SecurityToken( + TrustedApplicationTokenIdentifier, + TrustedApplicationTokenIdentifier, + ByteUtilities.StringToByteArrays(args[0]), + Encoding.ASCII.GetBytes(args[1])); + + var clientConfig = YARNClientConfiguration.ConfigurationModule + .Set(YARNClientConfiguration.SecurityTokenStr, JsonConvert.SerializeObject(token)) + .Build(); + + var tcpPortConfig = TcpPortConfigurationModule.ConfigurationModule + .Set(TcpPortConfigurationModule.PortRangeStart, args.Length > 3 ? args[3] : DefaultPortRangeStart) + .Set(TcpPortConfigurationModule.PortRangeCount, args.Length > 4 ? args[4] : DefaultPortRangeCount) + .Build(); + + var c = TangFactory.GetTang().NewConfigurationBuilder() + .BindIntNamedParam<NumberOfContainers>(args[2]) + .Build(); + + return Configurations.Merge(clientConfig, tcpPortConfig, c); + } + + private static IConfiguration GetRuntimeConfigurationForLocal(int numberOfContainers, string testFolder) + { + var runtimeConfig = LocalRuntimeClientConfiguration.ConfigurationModule + .Set(LocalRuntimeClientConfiguration.NumberOfEvaluators, numberOfContainers.ToString()) + .Set(LocalRuntimeClientConfiguration.RuntimeFolder, testFolder) + .Build(); + + return TangFactory.GetTang().NewConfigurationBuilder(runtimeConfig) + .BindIntNamedParam<NumberOfContainers>(numberOfContainers.ToString()) + .Build(); + } + + /// <summary> + /// Sample code to pull job final status until the Job is done + /// </summary> + /// <param name="jobSubmitionResult"></param> + /// <returns></returns> + private FinalState PullFinalJobStatus(IJobSubmissionResult jobSubmitionResult) + { + int n = 0; + var state = jobSubmitionResult.FinalState; + while (state.Equals(FinalState.UNDEFINED) && n++ < ReTryCounts) + { + Thread.Sleep(SleepTime); + state = jobSubmitionResult.FinalState; + } + return state; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/e18b5d36/lang/cs/Org.Apache.REEF.Tests/Performance/TestHelloREEF/TestHelloTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Performance/TestHelloREEF/TestHelloTask.cs b/lang/cs/Org.Apache.REEF.Tests/Performance/TestHelloREEF/TestHelloTask.cs new file mode 100644 index 0000000..b9aac5d --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Performance/TestHelloREEF/TestHelloTask.cs @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.Tests.Performance.TestHelloREEF +{ + /// <summary> + /// A Task that merely prints a greeting and exits. + /// </summary> + public sealed class TestHelloTask : ITask + { + [Inject] + private TestHelloTask() + { + } + + public void Dispose() + { + Console.WriteLine("Disposed."); + } + + public byte[] Call(byte[] memento) + { + Console.WriteLine("Hello, REEF!"); + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/e18b5d36/lang/cs/Org.Apache.REEF.Utilities/Logging/Logger.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Utilities/Logging/Logger.cs b/lang/cs/Org.Apache.REEF.Utilities/Logging/Logger.cs index 1733639..8ee9691 100644 --- a/lang/cs/Org.Apache.REEF.Utilities/Logging/Logger.cs +++ b/lang/cs/Org.Apache.REEF.Utilities/Logging/Logger.cs @@ -145,7 +145,7 @@ namespace Org.Apache.REEF.Utilities.Logging /// <summary> /// Log the message with the specified Log Level. /// - /// If addtional arguments are passed, the message will be treated as + /// If additional arguments are passed, the message will be treated as /// a format string. The format string and the additional arguments /// will be formatted according to string.Format() /// </summary> @@ -161,7 +161,7 @@ namespace Org.Apache.REEF.Utilities.Logging DateTime.Now.ToString("o", CultureInfo.InvariantCulture) + " " + System.Threading.Thread.CurrentThread.ManagedThreadId.ToString("D4", CultureInfo.InvariantCulture) - + Environment.NewLine + LogLevel[(int)level] + ": " + + " : " + LogLevel[(int)level] + ": " + msg; _traceSource.TraceEvent( http://git-wip-us.apache.org/repos/asf/reef/blob/e18b5d36/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/AllocatedEvaluatorBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/AllocatedEvaluatorBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/AllocatedEvaluatorBridge.java index 9f2a57a..568a3d7 100644 --- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/AllocatedEvaluatorBridge.java +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/AllocatedEvaluatorBridge.java @@ -24,6 +24,9 @@ import org.apache.reef.io.naming.Identifiable; import org.apache.reef.runtime.common.driver.evaluator.AllocatedEvaluatorImpl; import org.apache.reef.driver.evaluator.AllocatedEvaluator; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Date; import java.util.logging.Level; import java.util.logging.Logger; @@ -61,6 +64,11 @@ public final class AllocatedEvaluatorBridge extends NativeBridge implements Iden public void submitContextAndTaskString(final String evaluatorConfigurationString, final String contextConfigurationString, final String taskConfigurationString) { + + final DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss"); + LOG.log(Level.FINE, "AllocatedEvaluatorBridge:submitContextAndTaskString for evaluator id: {0}, time: {1}.", + new Object[] {evaluatorId, dateFormat.format(new Date())}); + if (evaluatorConfigurationString.isEmpty()) { throw new RuntimeException("empty evaluatorConfigurationString provided."); } http://git-wip-us.apache.org/repos/asf/reef/blob/e18b5d36/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java index 83f783f..de9f95b 100644 --- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java @@ -40,7 +40,9 @@ public final class NativeInterop { public static native void clrSystemAllocatedEvaluatorHandlerOnNext( final long handle, final AllocatedEvaluatorBridge javaEvaluatorBridge, - final InteropLogger interopLogger + final InteropLogger interopLogger, + final String nameServerInfo, + final String evaluatorId ); public static native void clrSystemActiveContextHandlerOnNext( http://git-wip-us.apache.org/repos/asf/reef/blob/e18b5d36/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java index 8a0fb86..ddc3385 100644 --- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java @@ -53,7 +53,10 @@ import javax.servlet.http.HttpServletResponse; import java.io.*; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.text.DateFormat; +import java.text.SimpleDateFormat; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.logging.Handler; import java.util.logging.Level; import java.util.logging.Logger; @@ -109,7 +112,7 @@ public final class JobDriver { /** * Map from context ID to running evaluator context. */ - private final Map<String, ActiveContext> contexts = new HashMap<>(); + private final ConcurrentHashMap<String, ActiveContext> contexts = new ConcurrentHashMap<>(); private final REEFFileNames reefFileNames; private final LocalAddressProvider localAddressProvider; @@ -118,13 +121,14 @@ public final class JobDriver { */ private final LoggingScopeFactory loggingScopeFactory; private final Set<String> definedRuntimes; + private final DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss"); private BridgeHandlerManager handlerManager = null; private boolean isRestarted = false; // We are holding on to following on bridge side. // Need to add references here so that GC does not collect them. - private final HashMap<String, AllocatedEvaluatorBridge> allocatedEvaluatorBridges = - new HashMap<>(); + private final ConcurrentHashMap<String, AllocatedEvaluatorBridge> allocatedEvaluatorBridges = + new ConcurrentHashMap<>(); private EvaluatorRequestorBridge evaluatorRequestorBridge; @@ -230,41 +234,40 @@ public final class JobDriver { } private void submitEvaluator(final AllocatedEvaluator eval, final EvaluatorProcess process) { - synchronized (JobDriver.this) { - eval.setProcess(process); - LOG.log(Level.INFO, "Allocated Evaluator: {0}, total running running {1}", - new Object[]{eval.getId(), JobDriver.this.contexts.size()}); - if (JobDriver.this.handlerManager.getAllocatedEvaluatorHandler() == 0) { - throw new RuntimeException("Allocated Evaluator Handler not initialized by CLR."); - } - final AllocatedEvaluatorBridge allocatedEvaluatorBridge = - this.allocatedEvaluatorBridgeFactory.getAllocatedEvaluatorBridge(eval, this.nameServerInfo); - allocatedEvaluatorBridges.put(allocatedEvaluatorBridge.getId(), allocatedEvaluatorBridge); - NativeInterop.clrSystemAllocatedEvaluatorHandlerOnNext( - JobDriver.this.handlerManager.getAllocatedEvaluatorHandler(), allocatedEvaluatorBridge, this.interopLogger); + eval.setProcess(process); + LOG.log(Level.FINE, "Allocated Evaluator: {0}, total running running {1}.", + new Object[]{eval.getId(), JobDriver.this.contexts.size()}); + final long handler = JobDriver.this.handlerManager.getAllocatedEvaluatorHandler(); + if (0 == handler) { + throw new RuntimeException("Allocated Evaluator Handler not initialized by CLR."); } + final AllocatedEvaluatorBridge allocatedEvaluatorBridge = + this.allocatedEvaluatorBridgeFactory.getAllocatedEvaluatorBridge(eval, this.nameServerInfo); + allocatedEvaluatorBridges.putIfAbsent(allocatedEvaluatorBridge.getId(), allocatedEvaluatorBridge); + NativeInterop.clrSystemAllocatedEvaluatorHandlerOnNext( + handler, allocatedEvaluatorBridge, this.interopLogger, this.nameServerInfo, eval.getId()); + LOG.log(Level.FINE, "End of JobDriver.Allocated Evaluator: {0}, time: {1}", + new Object[] {eval.getId(), dateFormat.format(new Date())}); } private void handleFailedEvaluator(final FailedEvaluator eval, final boolean isRestartFailed) { try (final LoggingScope ls = loggingScopeFactory.evaluatorFailed(eval.getId())) { - synchronized (JobDriver.this) { - LOG.log(Level.SEVERE, "FailedEvaluator", eval); - for (final FailedContext failedContext : eval.getFailedContextList()) { - final String failedContextId = failedContext.getId(); - LOG.log(Level.INFO, "removing context " + failedContextId + " from job driver contexts."); - JobDriver.this.contexts.remove(failedContextId); - } - String message = "Evaluator " + eval.getId() + " failed with message: " - + eval.getEvaluatorException().getMessage(); - JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes(StandardCharsets.UTF_8)); + LOG.log(Level.SEVERE, "FailedEvaluator", eval); + for (final FailedContext failedContext : eval.getFailedContextList()) { + final String failedContextId = failedContext.getId(); + LOG.log(Level.INFO, "removing context " + failedContextId + " from job driver contexts."); + JobDriver.this.contexts.remove(failedContextId); + } + final String message = "Evaluator " + eval.getId() + " failed with message: " + + eval.getEvaluatorException().getMessage(); + JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes(StandardCharsets.UTF_8)); - if (isRestartFailed) { - evaluatorFailedHandlerWaitForCLRBridgeSetup( - JobDriver.this.handlerManager.getDriverRestartFailedEvaluatorHandler(), eval, isRestartFailed); - } else { - evaluatorFailedHandlerWaitForCLRBridgeSetup(JobDriver.this.handlerManager.getFailedEvaluatorHandler(), - eval, isRestartFailed); - } + if (isRestartFailed) { + evaluatorFailedHandlerWaitForCLRBridgeSetup( + JobDriver.this.handlerManager.getDriverRestartFailedEvaluatorHandler(), eval, isRestartFailed); + } else { + evaluatorFailedHandlerWaitForCLRBridgeSetup(JobDriver.this.handlerManager.getFailedEvaluatorHandler(), + eval, isRestartFailed); } } } @@ -347,10 +350,8 @@ public final class JobDriver { @Override public void onNext(final AllocatedEvaluator allocatedEvaluator) { try (final LoggingScope ls = loggingScopeFactory.evaluatorAllocated(allocatedEvaluator.getId())) { - synchronized (JobDriver.this) { - LOG.log(Level.INFO, "AllocatedEvaluatorHandler.OnNext"); - JobDriver.this.submitEvaluator(allocatedEvaluator, clrProcessFactory.newEvaluatorProcess()); - } + LOG.log(Level.INFO, "AllocatedEvaluatorHandler.OnNext"); + JobDriver.this.submitEvaluator(allocatedEvaluator, clrProcessFactory.newEvaluatorProcess()); } } } @@ -362,12 +363,10 @@ public final class JobDriver { @Override public void onNext(final ActiveContext context) { try (final LoggingScope ls = loggingScopeFactory.activeContextReceived(context.getId())) { - synchronized (JobDriver.this) { - LOG.log(Level.INFO, "ActiveContextHandler: Context available: {0}", - new Object[]{context.getId()}); - JobDriver.this.contexts.put(context.getId(), context); - JobDriver.this.submit(context); - } + LOG.log(Level.INFO, "ActiveContextHandler: Context available: {0}", + new Object[]{context.getId()}); + JobDriver.this.contexts.put(context.getId(), context); + JobDriver.this.submit(context); } } } @@ -733,9 +732,7 @@ public final class JobDriver { NativeInterop.clrSystemClosedContextHandlerOnNext(JobDriver.this.handlerManager.getClosedContextHandler(), closedContextBridge); } - synchronized (JobDriver.this) { - JobDriver.this.contexts.remove(context.getId()); - } + JobDriver.this.contexts.remove(context.getId()); } } } @@ -757,9 +754,7 @@ public final class JobDriver { NativeInterop.clrSystemFailedContextHandlerOnNext(JobDriver.this.handlerManager.getFailedContextHandler(), failedContextBridge); } - synchronized (JobDriver.this) { - JobDriver.this.contexts.remove(context.getId()); - } + JobDriver.this.contexts.remove(context.getId()); final Optional<byte[]> err = context.getData(); if (err.isPresent()) { JobDriver.this.jobMessageObserver.sendMessageToClient(err.get()); http://git-wip-us.apache.org/repos/asf/reef/blob/e18b5d36/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java index 72aa640..5df89e7 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnSubmissionHelper.java @@ -281,7 +281,8 @@ public final class YarnSubmissionHelper implements AutoCloseable { launchCommand, this.resources, tokenProvider.getTokens()); this.applicationSubmissionContext.setAMContainerSpec(containerLaunchContext); - LOG.log(Level.INFO, "Submitting REEF Application to YARN. ID: {0}", this.applicationId); + LOG.log(Level.INFO, "Submitting REEF Application to YARN. ID: {0}, driver core: {1}", + new Object[] {this.applicationId, this.applicationSubmissionContext.getResource().getVirtualCores()}); if (LOG.isLoggable(Level.INFO)) { LOG.log(Level.INFO, "REEF app command: {0}", StringUtils.join(launchCommand, ' '));
