Repository: reef Updated Branches: refs/heads/master 267140543 -> affd9735d
[REEF-1772] Implement IPipelineDataConverter in REEF.NET for arrays of data This adds a new implementation for `IPipelineDataConverter` that supports arrays of arbitrary primitive types. JIRA: [REEF-1772](https://issues.apache.org/jira/browse/REEF-1772) Pull Request: This closes #1286 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/affd9735 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/affd9735 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/affd9735 Branch: refs/heads/master Commit: affd9735d7cd63c8576b89dc119ef1c1f61b053a Parents: 2671405 Author: roganc <[email protected]> Authored: Tue Apr 11 14:12:47 2017 -0700 Committer: Markus Weimer <[email protected]> Committed: Wed Apr 12 17:19:44 2017 -0700 ---------------------------------------------------------------------- .../GroupCommunication/PipeliningTests.cs | 230 +++++++++++++++++++ .../Org.Apache.REEF.Network.Tests.csproj | 1 + .../Config/GroupCommConfigurationOptions.cs | 8 + .../Impl/ArrayPipelineDataConverter.cs | 132 +++++++++++ .../Org.Apache.REEF.Network.csproj | 1 + 5 files changed, 372 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/affd9735/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/PipeliningTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/PipeliningTests.cs b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/PipeliningTests.cs new file mode 100644 index 0000000..d777086 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/PipeliningTests.cs @@ -0,0 +1,230 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Org.Apache.REEF.Network.Group.Config; +using Org.Apache.REEF.Network.Group.Pipelining; +using Org.Apache.REEF.Network.Group.Pipelining.Impl; +using Org.Apache.REEF.Tang.Exceptions; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Interface; +using System; +using System.Collections.Generic; +using Xunit; + +namespace Org.Apache.REEF.Network.Tests.GroupCommunication +{ + /// <summary> + /// Defines pipelining component tests + /// </summary> + public class PipeliningTests + { + /// <summary> + /// Test the ArrayPipelineConverter with floats + /// </summary> + [Fact] + public void TestFloatArrayPipelineDataConverter() + { + float[] testArray = { 0.1f, 0.2f, 0.3f, 0.4f, 0.5f }; + TestArrayPipelineDataConverter(testArray); + } + + /// <summary> + /// Test the ArrayPipelineConverter with doubles + /// </summary> + [Fact] + public void TestDoubleArrayPipelineDataConverter() + { + double[] testArray = { 0.1, 0.2, 0.3, 0.4, 0.5 }; + TestArrayPipelineDataConverter(testArray); + } + + /// <summary> + /// Test the ArrayPipelineConverter with ints + /// </summary> + [Fact] + public void TestIntArrayPipelineDataConverter() + { + int[] testArray = { 1, 2, 3, 4, 5 }; + TestArrayPipelineDataConverter(testArray); + } + + /// <summary> + /// Test the ArrayPipelineConverter with longs + /// </summary> + [Fact] + public void TestLongArrayPipelineDataConverter() + { + long[] testArray = { 1L, 2L, 3L, 4L, 5L }; + TestArrayPipelineDataConverter(testArray); + } + + /// <summary> + /// Test the ArrayPipelineConverter with generic objects + /// </summary> + [Fact] + public void TestObjectArrayPipelineDataConverter() + { + object[] testArray = + { + new { A = 1, B = 2, C = 3 }, + new { A = 2, B = 3, C = 4 }, + new { A = 3, B = 4, C = 5 }, + new { A = 4, B = 5, C = 6 }, + new { F = 5, G = 6, H = 7 } + }; + TestArrayPipelineDataConverter(testArray); + } + + /// <summary> + /// Test the ArrayPipelineConverter with an empty array + /// </summary> + [Fact] + public void TestArrayPipelineDataConverterWithEmptyArray() + { + object[] testArray = new object[0]; + TestArrayPipelineDataConverter(testArray); + } + + /// <summary> + /// Test the ArrayPipelineConverter with a null array + /// </summary> + [Fact] + public void TestArrayPipelineDataConverterWithNullArray() + { + object[] testArray = null; + TestArrayPipelineDataConverter(testArray); + } + + /// <summary> + /// Master test function for testing types of arrays in the ArrayPipelineDataConverter + /// </summary> + /// <typeparam name="T">The type of array to test</typeparam> + /// <param name="originalArray">An array to use in the test</param> + private static void TestArrayPipelineDataConverter<T>(T[] originalArray) where T : new() + { + // Verify that the constructor has the proper restrictions + AssertPositivePipelinePackageElementsRequired<T[], ArrayPipelineDataConverter<T>>(); + + // Test the valid case where we break up the array into smaller pieces + // First determine how many messages to create from originalArray + int pipelineMessageSize; + int nMessages; + if (originalArray == null) + { + nMessages = 0; + pipelineMessageSize = 1; + } + else if (originalArray.Length == 0 || originalArray.Length == 1) + { + nMessages = 1; + pipelineMessageSize = 1; // Necessary to instantiate the ArrayPipelineDataConverterConfig + } + else + { + nMessages = 2; + pipelineMessageSize = (int)Math.Ceiling(originalArray.Length / (double)nMessages); + } + + // Test that the valid configuration can be injected + IConfiguration config = GetPipelineDataConverterConfig(pipelineMessageSize); + IPipelineDataConverter<T[]> dataConverter = TangFactory.GetTang().NewInjector(config).GetInstance<ArrayPipelineDataConverter<T>>(); + + var pipelineData = dataConverter.PipelineMessage(originalArray); + + // Validate that the pipeline constructed the correct number of messages + Assert.Equal<int>(pipelineData.Count, nMessages); + + T[] deserializedArray = dataConverter.FullMessage(pipelineData); + + // Validate that the array is unaffected by the serialization / deserialization + AssertArrayEquality(originalArray, deserializedArray); + + // Verify that the "IsLast" property is set correctly + AssertIsLastFlag(pipelineData); + } + + /// <summary> + /// Verify that the IPipelineDataConverter<T> class requires a positive value for pipelinePackageElements + /// </summary> + /// <typeparam name="T"></typeparam> + /// <typeparam name="DataConverter"></typeparam> + private static void AssertPositivePipelinePackageElementsRequired<T, DataConverter>() + where DataConverter : class, IPipelineDataConverter<T> + { + // Verify that the PipelinePackageElements cannot be zero + var configWithZeroElements = GetPipelineDataConverterConfig(0); + Assert.Throws<InjectionException>(() => TangFactory.GetTang().NewInjector(configWithZeroElements).GetInstance<DataConverter>()); + + // Verify that the PipelinePackageElements cannot be less than 0 + var configWithNegativeElements = GetPipelineDataConverterConfig(-2); + Assert.Throws<InjectionException>(() => TangFactory.GetTang().NewInjector(configWithNegativeElements).GetInstance<DataConverter>()); + } + + /// <summary> + /// Validate that the IsLast flag is properly set on a list of pipeline messages + /// </summary> + /// <typeparam name="T"></typeparam> + /// <param name="pipelineData">A list of PipelineMessages</param> + private static void AssertIsLastFlag<T>(IList<PipelineMessage<T>> pipelineData) + { + // Verify that the "IsLast" property is set correctly + for (int i = 0; i < pipelineData.Count; i++) + { + Assert.Equal(pipelineData[i].IsLast, i == pipelineData.Count - 1); + } + } + + /// <summary> + /// Generic array equality method; Equality for type T must make sense for this to make sense + /// </summary> + /// <typeparam name="T"></typeparam> + /// <param name="expected"></param> + /// <param name="actual"></param> + private static void AssertArrayEquality<T>(T[] expected, T[] actual) + { + // Two null arrays are considered to be equal + // Check to make sure that the arrays are both defined or undefined + Assert.True((expected == null) == (actual == null)); + + // If the arrays are both null, then don't check any further + if (expected == null && actual == null) + { + return; + } + + Assert.Equal(expected.Length, actual.Length); + + for (int i = 0; i < actual.Length; i++) + { + Assert.True(EqualityComparer<T>.Default.Equals(expected[i], actual[i])); + } + } + + /// <summary> + /// Create a configuration with a PipelinePackageElements parameter + /// </summary> + /// <param name="pipelineMessageSize">The length of the individual messages in the pipeline</param> + /// <returns></returns> + private static IConfiguration GetPipelineDataConverterConfig(int pipelineMessageSize) + { + return TangFactory.GetTang() + .NewConfigurationBuilder() + .BindNamedParameter(typeof(GroupCommConfigurationOptions.PipelineMessageSize), pipelineMessageSize.ToString()) + .Build(); + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/affd9735/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj b/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj index 5bbc8c8..f430146 100644 --- a/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj +++ b/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj @@ -52,6 +52,7 @@ under the License. <Compile Include="$(SolutionDir)\SharedAssemblyInfo.cs"> <Link>Properties\SharedAssemblyInfo.cs</Link> </Compile> + <Compile Include="GroupCommunication\PipeliningTests.cs" /> <Compile Include="TcpClientConfigurationModuleTests.cs" /> <Compile Include="BlockingCollectionExtensionTests.cs" /> <Compile Include="GroupCommunication\GroupCommuDriverTests.cs" /> http://git-wip-us.apache.org/repos/asf/reef/blob/affd9735/lang/cs/Org.Apache.REEF.Network/Group/Config/GroupCommConfigurationOptions.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Config/GroupCommConfigurationOptions.cs b/lang/cs/Org.Apache.REEF.Network/Group/Config/GroupCommConfigurationOptions.cs index 51463d3..1100376 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Config/GroupCommConfigurationOptions.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Config/GroupCommConfigurationOptions.cs @@ -113,5 +113,13 @@ namespace Org.Apache.REEF.Network.Group.Config public class Initialize : Name<bool> { } + + /// <summary> + /// The number of elements to place into each message in the pipeline. + /// </summary> + [NamedParameter("The number of elements to place into each message in the pipeline.", "PipelineMessageSize", "1000000")] + internal sealed class PipelineMessageSize : Name<int> + { + } } } http://git-wip-us.apache.org/repos/asf/reef/blob/affd9735/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/Impl/ArrayPipelineDataConverter.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/Impl/ArrayPipelineDataConverter.cs b/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/Impl/ArrayPipelineDataConverter.cs new file mode 100644 index 0000000..12540ac --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/Impl/ArrayPipelineDataConverter.cs @@ -0,0 +1,132 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Org.Apache.REEF.Network.Group.Config; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Utilities.Logging; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.Serialization; + +namespace Org.Apache.REEF.Network.Group.Pipelining.Impl +{ + /// <summary> + /// An implementation of IPipelineDataConverter for pipelining Arrays of objects. + /// </summary> + /// <typeparam name="T">The type of object of the array to pipeline</typeparam> + public sealed class ArrayPipelineDataConverter<T> : IPipelineDataConverter<T[]> where T : new() + { + private readonly int _pipelineMessageSize; + private static readonly Logger Logger = Logger.GetLogger(typeof(ArrayPipelineDataConverter<T>)); + + [Inject] + private ArrayPipelineDataConverter([Parameter(typeof(GroupCommConfigurationOptions.PipelineMessageSize))] int pipelineMessageSize) + { + if (pipelineMessageSize <= 0) + { + throw new ArgumentException("PipelinePackageSize must be strictly positive"); + } + _pipelineMessageSize = pipelineMessageSize; + } + + /// <summary> + /// Converts the original message to be communicated in to multiple messages, breaking the array into pieces of size PipelineMessageSize + /// </summary> + /// <param name="message">The original message</param> + /// <returns>The list of pipelined messages</returns> + public List<PipelineMessage<T[]>> PipelineMessage(T[] message) + { + // Return null messages as an empty list + if (message == null) + { + return new List<PipelineMessage<T[]>>(); + } + else if (message.Length == 0) + { + // Special case; 0-length arrays are passed with a message with a zero-length array + return new List<PipelineMessage<T[]>> + { + new PipelineMessage<T[]>(new T[0], true) + }; + } + + int messageCount = ((message.Length - 1) / _pipelineMessageSize) + 1; + List<PipelineMessage<T[]>> messageList = new List<PipelineMessage<T[]>>(messageCount); + int offset = 0; + while (offset < message.Length) + { + int subLen = Math.Min(message.Length - offset, _pipelineMessageSize); + if (subLen <= 0) + { + throw new ArithmeticException("Tried to create a pipeline package with fewer than 1 element."); + } + T[] data = new T[subLen]; + Array.Copy(message, offset, data, 0, subLen); + bool isLast = subLen + offset == message.Length; + messageList.Add(new PipelineMessage<T[]>(data, isLast)); + offset += subLen; + } + if (messageCount != messageList.Count) + { + throw new SerializationException(string.Format("The wrong number of pipeline packages were created: Expected {0} but created {1}.", messageCount, messageList.Count)); + } + + return messageList; + } + + /// <summary> + /// Constructs the full final message from the communicated pipelined message + /// </summary> + /// <param name="pipelineMessage">A list of received pipelined messages</param> + /// <returns>The full constructed message</returns> + public T[] FullMessage(List<PipelineMessage<T[]>> pipelineMessage) + { + // Null data corresponds to an empty list of pipeline messages + if (pipelineMessage.Count == 0) + { + return null; + } + if (pipelineMessage.Count == 1) + { + return pipelineMessage[0].Data; + } + + int nElements = pipelineMessage.Sum(v => v.Data.Length); + + // A zero-length array gets returned as a zero-length array + if (nElements == 0) + { + return new T[0]; + } + + int offset = 0; + T[] values = new T[nElements]; + foreach (var message in pipelineMessage) + { + Array.Copy(message.Data, 0, values, offset, message.Data.Length); + offset += message.Data.Length; + } + if (offset != nElements) + { + throw new SerializationException(string.Format("The pipeline packages were deserialized incorrectly created: Expected {0} but created {1}.", nElements, offset)); + } + + return values; + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/affd9735/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj index 0b57f89..0beb953 100644 --- a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj +++ b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj @@ -85,6 +85,7 @@ under the License. <Compile Include="Group\Operators\Impl\ScatterSender.cs" /> <Compile Include="Group\Operators\Impl\Sender.cs" /> <Compile Include="Group\Operators\IOperatorSpec.cs" /> + <Compile Include="Group\Pipelining\Impl\ArrayPipelineDataConverter.cs" /> <Compile Include="Group\Pipelining\StreamingPipelineMessageCodec.cs" /> <Compile Include="Group\Task\ICommunicationGroupClientInternal.cs" /> <Compile Include="Group\Task\Impl\ChildNodeContainer.cs" />
