Repository: reef Updated Branches: refs/heads/master 5abb8b13a -> 184e4d954
[REEF-1835] Use stricter type constraints in Avro ProtocolSerializer * Also, minor refactoring in `ProtocolSerializer` and unit tests JIRA: [REEF-1835](https://issues.apache.org/jira/browse/REEF-1835) Pull request: This closes #1337 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/184e4d95 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/184e4d95 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/184e4d95 Branch: refs/heads/master Commit: 184e4d9541da9016a1ed08a4fb084bbba02de984 Parents: 5abb8b1 Author: Sergiy Matusevych <[email protected]> Authored: Wed Jul 19 18:32:54 2017 -0700 Committer: Doug Service <[email protected]> Committed: Sat Jul 22 01:44:34 2017 +0000 ---------------------------------------------------------------------- .../ProtocolSerializerTest.cs | 36 +++++------ .../Avro/IMessageInstance.cs | 36 +++++++++++ .../Avro/MessageInstance.cs | 27 +++++--- .../Avro/ProtocolSerializer.cs | 65 +++++++++++++------- .../Org.Apache.REEF.Wake.csproj | 3 +- 5 files changed, 117 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/184e4d95/lang/cs/Org.Apache.REEF.Wake.Tests/ProtocolSerializerTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/ProtocolSerializerTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/ProtocolSerializerTest.cs index 8f4a1ca..cf3ac02 100644 --- a/lang/cs/Org.Apache.REEF.Wake.Tests/ProtocolSerializerTest.cs +++ b/lang/cs/Org.Apache.REEF.Wake.Tests/ProtocolSerializerTest.cs @@ -31,21 +31,19 @@ namespace Org.Apache.REEF.Wake.Tests /// <summary> /// Observer to receive and verify test message contents. /// </summary> - internal sealed class TestMessageObserver : IObserver<MessageInstance<AvroTestMessage>> + internal sealed class TestMessageObserver : IObserver<IMessageInstance<AvroTestMessage>> { - int number; - string data; + private readonly IMessageInstance<AvroTestMessage> messageInstance; - public TestMessageObserver(int number, string data) + public TestMessageObserver(long seq, AvroTestMessage msg) { - this.number = number; - this.data = data; + messageInstance = new MessageInstance<AvroTestMessage>(seq, msg); } - public void OnNext(MessageInstance<AvroTestMessage> instance) + public void OnNext(IMessageInstance<AvroTestMessage> otherMessageInstance) { - Assert.Equal(instance.message.number, this.number); - Assert.Equal(instance.message.data, this.data); + Assert.Equal(messageInstance.Message.number, otherMessageInstance.Message.number); + Assert.Equal(messageInstance.Message.data, otherMessageInstance.Message.data); } public void OnError(Exception error) @@ -59,8 +57,7 @@ namespace Org.Apache.REEF.Wake.Tests } } - [Collection("FunctionalTests")] - public class TestProtocolSerializer + public sealed class TestProtocolSerializer { /// <summary> /// Setup two way communication between two remote managers through the loopback @@ -87,22 +84,23 @@ namespace Org.Apache.REEF.Wake.Tests { // Register observers for remote manager 1 and remote manager 2 var remoteEndpoint = new IPEndPoint(listeningAddress, 0); - var observer1 = Observer.Create<byte[]>(queue1.Add); - var observer2 = Observer.Create<byte[]>(queue2.Add); - remoteManager1.RegisterObserver(remoteEndpoint, observer1); - remoteManager2.RegisterObserver(remoteEndpoint, observer2); + remoteManager1.RegisterObserver(remoteEndpoint, Observer.Create<byte[]>(queue1.Add)); + remoteManager2.RegisterObserver(remoteEndpoint, Observer.Create<byte[]>(queue2.Add)); + + var msg1 = new AvroTestMessage(numbers[0], strings[0]); + var msg2 = new AvroTestMessage(numbers[1], strings[1]); // Remote manager 1 sends avro message to remote manager 2 var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint); - remoteObserver1.OnNext(serializer.Write(new AvroTestMessage(numbers[0], strings[0]), 1)); + remoteObserver1.OnNext(serializer.Write(msg1, 1)); // Remote manager 2 sends avro message to remote manager 1 var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint); - remoteObserver2.OnNext(serializer.Write(new AvroTestMessage(numbers[1], strings[1]), 2)); + remoteObserver2.OnNext(serializer.Write(msg2, 2)); // Verify the messages are properly received. - serializer.Read(queue1.Take(), new TestMessageObserver(numbers[1], strings[1])); - serializer.Read(queue2.Take(), new TestMessageObserver(numbers[0], strings[0])); + serializer.Read(queue1.Take(), new TestMessageObserver(2, msg2)); + serializer.Read(queue2.Take(), new TestMessageObserver(1, msg1)); } } } http://git-wip-us.apache.org/repos/asf/reef/blob/184e4d95/lang/cs/Org.Apache.REEF.Wake/Avro/IMessageInstance.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Avro/IMessageInstance.cs b/lang/cs/Org.Apache.REEF.Wake/Avro/IMessageInstance.cs new file mode 100755 index 0000000..1060d6f --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Avro/IMessageInstance.cs @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License + +namespace Org.Apache.REEF.Wake.Avro +{ + /// <summary> + /// An interface to a readonly pair of (message, sequence number). + /// </summary> + /// <typeparam name="T">Message payload type.</typeparam> + public interface IMessageInstance<out T> + { + /// <summary> + /// Get the sequence number of a message. + /// </summary> + long Sequence { get; } + + /// <summary> + /// Return the data payload of message instance. + /// </summary> + T Message { get; } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/184e4d95/lang/cs/Org.Apache.REEF.Wake/Avro/MessageInstance.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Avro/MessageInstance.cs b/lang/cs/Org.Apache.REEF.Wake/Avro/MessageInstance.cs index 96ac8a4..8560f1d 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Avro/MessageInstance.cs +++ b/lang/cs/Org.Apache.REEF.Wake/Avro/MessageInstance.cs @@ -18,19 +18,30 @@ namespace Org.Apache.REEF.Wake.Avro { /// <summary> - /// Wrapper class to bind a specific instance of an Avro messagage - /// with it associated sequence number. + /// Wrapper class to bind a specific instance of a message with the associated sequence number. /// </summary> - /// <typeparam name="T"></typeparam> - public struct MessageInstance<T> + /// <typeparam name="T">Message payload type.</typeparam> + public sealed class MessageInstance<T> : IMessageInstance<T> { - public long sequence; - public T message; + /// <summary> + /// Get the sequence number of a message. + /// </summary> + public long Sequence { get; private set; } + /// <summary> + /// Return the data payload of message instance. + /// </summary> + public T Message { get; private set; } + + /// <summary> + /// Create a new instance of the (sequence number, message payload) pair. + /// </summary> + /// <param name="sequence">The message sequence number.</param> + /// <param name="message">The message payload.</param> public MessageInstance(long sequence, T message) { - this.sequence = sequence; - this.message = message; + Sequence = sequence; + Message = message; } } } http://git-wip-us.apache.org/repos/asf/reef/blob/184e4d95/lang/cs/Org.Apache.REEF.Wake/Avro/ProtocolSerializer.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Avro/ProtocolSerializer.cs b/lang/cs/Org.Apache.REEF.Wake/Avro/ProtocolSerializer.cs index dff30b1..4adcc88 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Avro/ProtocolSerializer.cs +++ b/lang/cs/Org.Apache.REEF.Wake/Avro/ProtocolSerializer.cs @@ -36,36 +36,52 @@ namespace Org.Apache.REEF.Wake.Avro { private static readonly Logger Logr = Logger.GetLogger(typeof(ProtocolSerializer)); - // Delagates for message serializers and deserializers. + /// <summary> + /// Delegate for message serializer. + /// </summary> private delegate void Serialize(MemoryStream stream, object message); + + /// <summary> + /// Delegate for message deserializer. + /// </summary> + /// <param name="observer">Must be of type IObserver<IMessageInstance<?></param> private delegate void Deserialize(MemoryStream stream, object observer, long sequence); - // Message type to serialize/derserialize delagate. - private readonly SortedDictionary<string, Serialize> serializeMap = new SortedDictionary<string, Serialize>(); - private readonly SortedDictionary<string, Deserialize> deserializeMap = new SortedDictionary<string, Deserialize>(); + /// <summary> + /// Map from message type (a string with the message class name) to serializer. + /// </summary> + private readonly SortedDictionary<string, Serialize> + serializeMap = new SortedDictionary<string, Serialize>(); + + /// <summary> + /// Map from message type (a string with the message class name) to deserializer. + /// </summary> + private readonly SortedDictionary<string, Deserialize> + deserializeMap = new SortedDictionary<string, Deserialize>(); private readonly IAvroSerializer<Header> headerSerializer = AvroSerializer.Create<Header>(); /// <summary> + /// Non-generic reflection record for the Register() method of this class. A constant. + /// </summary> + private static readonly MethodInfo RegisterMethodInfo = + typeof(ProtocolSerializer).GetMethod("Register", BindingFlags.Instance | BindingFlags.NonPublic); + + /// <summary> /// Register all of the protocol messages using reflection. /// </summary> - /// <param name="assembly">The Assembley object which contains the namespace of the message classes.</param> + /// <param name="assembly">The Assembly object which contains the namespace of the message classes.</param> /// <param name="messageNamespace">A string which contains the namespace the protocol messages.</param> public ProtocolSerializer(Assembly assembly, string messageNamespace) { - MethodInfo registerInfo = typeof(ProtocolSerializer).GetMethod("Register", BindingFlags.Instance | BindingFlags.NonPublic); - MethodInfo genericInfo; - - Logr.Log(Level.Info, "Retrieving types for assembly: {0}", assembly.FullName); - List<Type> types = new List<Type>(assembly.GetTypes()); - types.Add(typeof(Header)); + Logr.Log(Level.Verbose, "Retrieving types for assembly: {0}", assembly.FullName); + var types = new List<Type>(assembly.GetTypes()) { typeof(Header) }; foreach (Type type in types) { - string name = type.FullName; - if (name.StartsWith(messageNamespace)) + if (type.FullName.StartsWith(messageNamespace)) { - genericInfo = registerInfo.MakeGenericMethod(new[] { type }); + MethodInfo genericInfo = RegisterMethodInfo.MakeGenericMethod(new[] { type }); genericInfo.Invoke(this, null); } } @@ -77,7 +93,8 @@ namespace Org.Apache.REEF.Wake.Avro /// <typeparam name="TMessage">The class type of the message being registered.</typeparam> internal void Register<TMessage>() { - Logr.Log(Level.Info, "Registering message type: {0} {1}", typeof(TMessage).FullName, typeof(TMessage).Name); + Logr.Log(Level.Info, "Registering message type: {0} {1}", + typeof(TMessage).FullName, typeof(TMessage).Name); IAvroSerializer<TMessage> messageSerializer = AvroSerializer.Create<TMessage>(); Serialize serialize = (MemoryStream stream, object message) => @@ -89,7 +106,7 @@ namespace Org.Apache.REEF.Wake.Avro Deserialize deserialize = (MemoryStream stream, object observer, long sequence) => { TMessage message = messageSerializer.Deserialize(stream); - IObserver<MessageInstance<TMessage>> msgObserver = observer as IObserver<MessageInstance<TMessage>>; + var msgObserver = observer as IObserver<IMessageInstance<TMessage>>; if (msgObserver != null) { msgObserver.OnNext(new MessageInstance<TMessage>(sequence, message)); @@ -106,7 +123,8 @@ namespace Org.Apache.REEF.Wake.Avro /// Serialize the input message and return a byte array. /// </summary> /// <param name="message">An object reference to the messeage to be serialized</param> - /// <param name="sequence">A long which cotains the higher level protocols sequence number for the message.</param> + /// <param name="sequence"> + /// A long which cotains the higher level protocols sequence number for the message.</param> /// <returns>A byte array containing the serialized header and message.</returns> public byte[] Write(object message, long sequence) { @@ -126,7 +144,8 @@ namespace Org.Apache.REEF.Wake.Avro } else { - throw new SeializationException("Request to serialize unknown message type: " + name); + throw new SeializationException( + "Request to serialize unknown message type: " + name); } return stream.GetBuffer(); } @@ -141,9 +160,10 @@ namespace Org.Apache.REEF.Wake.Avro /// <summary> /// Read a message from the input byte array. /// </summary> - /// <param name="data">The byte array containing a header message and message to be deserialized.</param> - /// <param name="observer">An object which implements the IObserver<> interface for the message being deserialized.</param> - public void Read(byte[] data, object observer) + /// <param name="data">Byte array containing a header message and message to be deserialized.</param> + /// <param name="observer">An object which implements the IObserver<> + /// interface for the message being deserialized.</param> + public void Read<T>(byte[] data, IObserver<IMessageInstance<T>> observer) { try { @@ -157,7 +177,8 @@ namespace Org.Apache.REEF.Wake.Avro } else { - throw new SeializationException("Request to deserialize unknown message type: " + head.className); + throw new SeializationException( + "Request to deserialize unknown message type: " + head.className); } } } http://git-wip-us.apache.org/repos/asf/reef/blob/184e4d95/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj index ea76e5d..6e75ff3 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj +++ b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj @@ -1,4 +1,4 @@ -<?xml version="1.0" encoding="utf-8"?> +<?xml version="1.0" encoding="utf-8"?> <!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file @@ -62,6 +62,7 @@ under the License. <Link>Properties\SharedAssemblyInfo.cs</Link> </Compile> <Compile Include="AbstractEStage.cs" /> + <Compile Include="Avro\IMessageInstance.cs" /> <Compile Include="Avro\MessageInstance.cs" /> <Compile Include="Avro\Message\Header.cs" /> <Compile Include="Avro\ProtocolSerializer.cs" />
