Repository: reef Updated Branches: refs/heads/master d609c002c -> 4d1b80e10
[REEF-1842] Making IMRU task and input data association deterministic * Use partitionDescriptor id as part of the context id so that context id is associated with a fixed partitionDescriptor. Please notice that the partition descriptor id always follows the pattern like RandomInputPartition-x and is unique * Make contexts stored in a sorted collection to ensure tasks with associated context/partitionDescriptor are always add to group in the same sequence. JIRA: [REEF-1842](https://issues.apache.org/jira/browse/REEF-1842) This closes #1346 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/4d1b80e1 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/4d1b80e1 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/4d1b80e1 Branch: refs/heads/master Commit: 4d1b80e107da81510ec15e9ca72569fc6cbbac72 Parents: d609c00 Author: Julia Wang <[email protected]> Authored: Wed Jul 26 19:04:17 2017 -0700 Committer: Sergiy Matusevych <[email protected]> Committed: Wed Aug 30 15:49:51 2017 -0700 ---------------------------------------------------------------------- .../OnREEF/Driver/ActiveContextManager.cs | 4 +- .../OnREEF/Driver/IMRUDriver.cs | 1 + .../PartitionDescriptorContextIdBundle.cs | 41 ++++++++ .../ServiceAndContextConfigurationProvider.cs | 101 ++++++++++++------- .../Org.Apache.REEF.IMRU.csproj | 1 + 5 files changed, 112 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/4d1b80e1/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ActiveContextManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ActiveContextManager.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ActiveContextManager.cs index 437b76f..d73abf6 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ActiveContextManager.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ActiveContextManager.cs @@ -35,7 +35,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver internal sealed class ActiveContextManager : IDisposable, IObservable<IEnumerable<IActiveContext>> { private static readonly Logger Logger = Logger.GetLogger(typeof(ActiveContextManager)); - private readonly IDictionary<string, IActiveContext> _activeContexts = new Dictionary<string, IActiveContext>(); + private readonly IDictionary<string, IActiveContext> _activeContexts = new SortedDictionary<string, IActiveContext>(); private readonly int _totalExpectedContexts; private IObserver<IEnumerable<IActiveContext>> _activeContextObserver; @@ -112,7 +112,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver if (AreAllContextsReceived && _activeContextObserver != null) { - _activeContextObserver.OnNext(_activeContexts.Values); + _activeContextObserver.OnNext(ActiveContexts); } } http://git-wip-us.apache.org/repos/asf/reef/blob/4d1b80e1/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs index 308a079..3db1bcd 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs @@ -392,6 +392,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver : GetMapperTaskIdByEvaluatorId(activeContext.EvaluatorId); commGroup.AddTask(taskId); taskIdAndContextMapping.Add(taskId, activeContext); + Logger.Log(Level.Info, "Adding {0} with associated context: {1} to communication group: {2}.", taskId, activeContext.Id, IMRUConstants.CommunicationGroupName); } foreach (var mapping in taskIdAndContextMapping) http://git-wip-us.apache.org/repos/asf/reef/blob/4d1b80e1/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/PartitionDescriptorContextIdBundle.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/PartitionDescriptorContextIdBundle.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/PartitionDescriptorContextIdBundle.cs new file mode 100644 index 0000000..bb0b118 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/PartitionDescriptorContextIdBundle.cs @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +namespace Org.Apache.REEF.IMRU.OnREEF.Driver +{ + /// <summary> + /// It keeps the mapping between context id and partition descriptor id so that the context with the same id always bundled to the same partition data. + /// </summary> + internal sealed class PartitionDescriptorContextIdBundle + { + /// <summary> + /// Create an object of PartitionDescriptorContextIdBunddle to maintain the relationship. + /// If an evaluator failed and a new context is created, the same context id with the same partition data will be assigned to the new context + /// </summary> + /// <param name="partitionDescriptorId"></param> + /// <param name="contextId"></param> + internal PartitionDescriptorContextIdBundle(string partitionDescriptorId, string contextId) + { + PartitionDescriptorId = partitionDescriptorId; + ContextId = contextId; + } + + internal string PartitionDescriptorId { get; private set; } + + internal string ContextId { get; private set; } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/4d1b80e1/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs index 0d4e27b..a5c8422 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs @@ -40,24 +40,46 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver [NotThreadSafe] internal sealed class ServiceAndContextConfigurationProvider<TMapInput, TMapOutput, TPartitionType> { - private static readonly Logger Logger = Logger.GetLogger(typeof(ServiceAndContextConfigurationProvider<TMapInput, TMapOutput, TPartitionType>)); + private static readonly Logger Logger + = Logger.GetLogger(typeof(ServiceAndContextConfigurationProvider<TMapInput, TMapOutput, TPartitionType>)); - private readonly Dictionary<string, string> _partitionIdProvider = new Dictionary<string, string>(); - private readonly Stack<string> _partitionDescriptorIds = new Stack<string>(); + /// <summary> + /// Mapping between Evaluator id and assigned partition descriptor/context ids + /// </summary> + private readonly Dictionary<string, PartitionDescriptorContextIdBundle> _partitionContextIdProvider + = new Dictionary<string, PartitionDescriptorContextIdBundle>(); + + /// <summary> + /// Available partition descriptor and context ids stack + /// </summary> + private readonly Stack<PartitionDescriptorContextIdBundle> _availablePartitionDescriptorContextIds + = new Stack<PartitionDescriptorContextIdBundle>(); + + /// <summary> + /// Input partition data set + /// </summary> private readonly IPartitionedInputDataSet _dataset; + + /// <summary>_configurationManager + /// Configuration manager that provides configurations + /// </summary> private readonly ConfigurationManager _configurationManager; /// <summary> - /// Constructs the object witch maintains partitionDescriptor Ids so that to provide proper data load configuration + /// Constructs the object which maintains partitionDescriptor Ids so that to provide proper data load configuration + /// It also maintains the partitionDescriptor id and context id mapping to ensure same context id alway assign the same data partition + /// This is to ensure if the tasks are added to the typology based on the sequence of context id, the result is deterministic. /// </summary> /// <param name="dataset">partition input dataset</param> /// <param name="configurationManager">Configuration manager that holds configurations for context and tasks</param> internal ServiceAndContextConfigurationProvider(IPartitionedInputDataSet dataset, ConfigurationManager configurationManager) { _dataset = dataset; + int contextSequenceNumber = 0; foreach (var descriptor in _dataset) { - _partitionDescriptorIds.Push(descriptor.Id); + var contextId = string.Format("DataLoadingContext-{0}", contextSequenceNumber++); + _availablePartitionDescriptorContextIds.Push(new PartitionDescriptorContextIdBundle(descriptor.Id, contextId)); } _configurationManager = configurationManager; } @@ -70,13 +92,19 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver /// <returns>Whether failed evaluator is master or not</returns> internal void RemoveEvaluatorIdFromPartitionIdProvider(string evaluatorId) { - if (!_partitionIdProvider.ContainsKey(evaluatorId)) + PartitionDescriptorContextIdBundle partitionDescriptor; + if (_partitionContextIdProvider.TryGetValue(evaluatorId, out partitionDescriptor)) + { + _availablePartitionDescriptorContextIds.Push(partitionDescriptor); + _partitionContextIdProvider.Remove(evaluatorId); + } + else { - var msg = string.Format(CultureInfo.InvariantCulture, "Partition descriptor for Failed evaluator:{0} not present", evaluatorId); - Exceptions.Throw(new Exception(msg), Logger); + var msg = string.Format(CultureInfo.InvariantCulture, + "Partition descriptor for Failed evaluator:{0} not present", + evaluatorId); + throw new IMRUSystemException(msg); } - _partitionDescriptorIds.Push(_partitionIdProvider[evaluatorId]); - _partitionIdProvider.Remove(evaluatorId); } /// <summary> @@ -105,13 +133,16 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver /// <returns></returns> internal string GetPartitionIdByEvaluatorId(string evaluatorId) { - if (!_partitionIdProvider.ContainsKey(evaluatorId)) + PartitionDescriptorContextIdBundle partitionDescriptorContextId; + _partitionContextIdProvider.TryGetValue(evaluatorId, out partitionDescriptorContextId); + + if (partitionDescriptorContextId == null) { - var msg = string.Format(CultureInfo.InvariantCulture, "Partition descriptor for evaluator:{0} is not present in the mapping", evaluatorId); - Exceptions.Throw(new IMRUSystemException(msg), Logger); + var msg = string.Format(CultureInfo.InvariantCulture, + "Partition descriptor for evaluator:{0} is not present in the mapping", evaluatorId); + throw new IMRUSystemException(msg); } - - return _partitionIdProvider[evaluatorId]; + return partitionDescriptorContextId.PartitionDescriptorId; } /// <summary> @@ -119,51 +150,53 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver /// evaluator or new configuration /// </summary> /// <param name="evaluatorId"></param> - /// <returns></returns> + /// <returns>Configuration for context and service</returns> internal ContextAndServiceConfiguration GetDataLoadingConfigurationForEvaluatorById(string evaluatorId) { - if (_partitionDescriptorIds.Count == 0) + try { - Exceptions.Throw(new IMRUSystemException("No more data configuration can be provided"), Logger); + Logger.Log(Level.Info, "Getting a new data loading configuration"); + _partitionContextIdProvider.Add(evaluatorId, _availablePartitionDescriptorContextIds.Pop()); } - - if (_partitionIdProvider.ContainsKey(evaluatorId)) + catch (InvalidOperationException e) + { + throw new IMRUSystemException("No more data configuration can be provided", e); + } + catch (ArgumentException e) { var msg = string.Format( CultureInfo.InvariantCulture, "Evaluator Id:{0} already present in configuration cache, they have to be unique", evaluatorId); - Exceptions.Throw(new IMRUSystemException(msg), Logger); + throw new IMRUSystemException(msg, e); } - Logger.Log(Level.Info, "Getting a new data loading configuration"); - _partitionIdProvider[evaluatorId] = _partitionDescriptorIds.Pop(); - try { + var partitionIdContextId = _partitionContextIdProvider[evaluatorId]; IPartitionDescriptor partitionDescriptor = - _dataset.GetPartitionDescriptorForId(_partitionIdProvider[evaluatorId]); - return GetDataLoadingContextAndServiceConfiguration(partitionDescriptor, evaluatorId); + _dataset.GetPartitionDescriptorForId(partitionIdContextId.PartitionDescriptorId); + return GetDataLoadingContextAndServiceConfiguration(partitionDescriptor, partitionIdContextId.ContextId); } catch (Exception e) { - var msg = string.Format(CultureInfo.InvariantCulture, "Error while trying to access partition descriptor:{0} from dataset", - _partitionIdProvider[evaluatorId]); + var msg = string.Format(CultureInfo.InvariantCulture, + "Error while trying to access partition descriptor:{0} from dataset", + _partitionContextIdProvider[evaluatorId]); Exceptions.Throw(e, msg, Logger); return null; } } /// <summary> - /// Creates service and data loading context configuration for given evaluator id + /// Creates service and data loading context configuration for given context id and partition descriptor /// </summary> /// <param name="partitionDescriptor"></param> - /// <param name="evaluatorId"></param> - /// <returns></returns> + /// <param name="contextId"></param> + /// <returns>Configuration for context and service</returns> private ContextAndServiceConfiguration GetDataLoadingContextAndServiceConfiguration( - IPartitionDescriptor partitionDescriptor, - string evaluatorId) + IPartitionDescriptor partitionDescriptor, string contextId) { var dataLoadingContextConf = TangFactory.GetTang() @@ -185,7 +218,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver .Build(); var contextConf = ContextConfiguration.ConfigurationModule - .Set(ContextConfiguration.Identifier, string.Format("DataLoading-{0}", evaluatorId)) + .Set(ContextConfiguration.Identifier, contextId) .Build(); return new ContextAndServiceConfiguration(contextConf, serviceConf); } http://git-wip-us.apache.org/repos/asf/reef/blob/4d1b80e1/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj index aaf93fb..be74b86 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj +++ b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj @@ -85,6 +85,7 @@ under the License. <Compile Include="OnREEF\Driver\IMRUSystemException.cs" /> <Compile Include="OnREEF\Driver\IMRUConstants.cs" /> <Compile Include="OnREEF\Driver\IMRUDriver.cs" /> + <Compile Include="OnREEF\Driver\PartitionDescriptorContextIdBundle.cs" /> <Compile Include="OnREEF\Driver\ServiceAndContextConfigurationProvider.cs" /> <Compile Include="OnREEF\Driver\StateMachine\TaskStateMachine.cs" /> <Compile Include="OnREEF\Driver\StateMachine\StateTransition.cs" />
