Repository: reef
Updated Branches:
  refs/heads/master efaee7311 -> 643fe9b21


[REEF-1813] Add Avro message protocol serializer

   * Added a ProtocolSerializer class to REEF.Wake that automatically
     constructs the searializers and callback mechanism for all
     of the Avro messages defined in a given C# namespace.
   * Added a test case to wake tests that verify the ProtocolSerializer
     properly serializes and deserializes Avro messages going through
     a two RemoteManager connection.
   * Moved header message from bridge clr to wake where serializer lives.

JIRA:
  [REEF-1813](https://issues.apache.org/jira/browse/REEF-1813)

 Pull request:
   This closes #1321


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/643fe9b2
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/643fe9b2
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/643fe9b2

Branch: refs/heads/master
Commit: 643fe9b21c6adc14dee3abf1fdaa0640c30941d4
Parents: efaee73
Author: Doug Service <[email protected]>
Authored: Fri Jun 30 21:16:14 2017 +0000
Committer: Sergiy Matusevych <[email protected]>
Committed: Tue Jul 18 19:47:39 2017 -0700

----------------------------------------------------------------------
 lang/common/wake/avro/Header.avsc               |  41 +++++
 lang/cs/AvroCodeGeneration.targets              |   6 +-
 .../Message/Header.cs                           |  74 --------
 .../Org.Apache.REEF.Bridge.CLR.csproj           |   3 +-
 .../Org.Apache.REEF.Bridge.CLR/packages.config  |   2 -
 .../Message/AvroTestMessage.cs                  |  45 +++++
 .../Org.Apache.REEF.Wake.Tests.csproj           |   7 +
 .../ProtocolSerializerTest.cs                   | 109 ++++++++++++
 .../Org.Apache.REEF.Wake.Tests/packages.config  |   1 +
 .../Org.Apache.REEF.Wake/Avro/Message/Header.cs |  74 ++++++++
 .../Avro/MessageInstance.cs                     |  36 ++++
 .../Avro/ProtocolSerializer.cs                  | 171 +++++++++++++++++++
 .../Avro/SeializationException.cs               |  33 ++++
 .../Org.Apache.REEF.Wake.csproj                 |  15 ++
 lang/cs/Org.Apache.REEF.Wake/packages.config    |   4 +
 lang/cs/nuget.config                            |   1 +
 16 files changed, 541 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/643fe9b2/lang/common/wake/avro/Header.avsc
----------------------------------------------------------------------
diff --git a/lang/common/wake/avro/Header.avsc 
b/lang/common/wake/avro/Header.avsc
new file mode 100644
index 0000000..8efd4f5
--- /dev/null
+++ b/lang/common/wake/avro/Header.avsc
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+[
+  /*
+   * Identify the next message in the Java/C# bridge protocol.
+   */
+  {
+    "namespace":"org.apache.reef.wake.avro.message",
+    "type":"record",
+    "name":"Header",
+    "doc":"Identifies the following message in a given protocol.",
+    "fields":[
+      {
+        "name":"sequence",
+        "doc":"Sequence number of message.",
+        "type":"long"
+      },
+      {
+        "name":"className",
+        "doc":"The name of the message class.",
+        "type":"string"
+      }
+    ]
+  }
+]

http://git-wip-us.apache.org/repos/asf/reef/blob/643fe9b2/lang/cs/AvroCodeGeneration.targets
----------------------------------------------------------------------
diff --git a/lang/cs/AvroCodeGeneration.targets 
b/lang/cs/AvroCodeGeneration.targets
index 48a6695..c6599b2 100644
--- a/lang/cs/AvroCodeGeneration.targets
+++ b/lang/cs/AvroCodeGeneration.targets
@@ -131,9 +131,9 @@ under the License.
   </UsingTask>
   <Target Name="SetupAvroCodeGen" DependsOnTargets="RestorePackages">
     <Message Text="Copying Avro code generation files to 
$(AvroBinaryDirectory)..." />
-    <Copy SourceFiles="$(AvroTools)" 
DestinationFolder="$(AvroBinaryDirectory)" />
-    <Copy SourceFiles="$(AvroLibrary)" 
DestinationFolder="$(AvroBinaryDirectory)" />
-    <Copy SourceFiles="$(NewtonsoftLibrary)" 
DestinationFolder="$(AvroBinaryDirectory)" />
+    <Copy SourceFiles="$(AvroTools)" 
DestinationFolder="$(AvroBinaryDirectory)" SkipUnchangedFiles="true" />
+    <Copy SourceFiles="$(AvroLibrary)" 
DestinationFolder="$(AvroBinaryDirectory)" SkipUnchangedFiles="true" />
+    <Copy SourceFiles="$(NewtonsoftLibrary)" 
DestinationFolder="$(AvroBinaryDirectory)" SkipUnchangedFiles="true" />
   </Target>
   <Target Name="CodeGeneration" DependsOnTargets="SetupAvroCodeGen" 
BeforeTargets="CoreCompile">
     <Message Text="Generating C# classes from Avro avsc files @(Compile)..." />

http://git-wip-us.apache.org/repos/asf/reef/blob/643fe9b2/lang/cs/Org.Apache.REEF.Bridge.CLR/Message/Header.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Bridge.CLR/Message/Header.cs 
b/lang/cs/Org.Apache.REEF.Bridge.CLR/Message/Header.cs
deleted file mode 100644
index cf0b939..0000000
--- a/lang/cs/Org.Apache.REEF.Bridge.CLR/Message/Header.cs
+++ /dev/null
@@ -1,74 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-//<auto-generated />
-namespace org.apache.reef.bridge.message
-{
-    using System;
-    using System.Collections.Generic;
-    using System.Runtime.Serialization;
-    using Microsoft.Hadoop.Avro;
-
-    /// <summary>
-    /// Used to serialize and deserialize Avro record 
org.apache.reef.bridge.message.Header.
-    /// </summary>
-    [DataContract(Namespace = "org.apache.reef.bridge.message")]
-    public partial class Header
-    {
-        private const string JsonSchema = 
@"{""type"":""record"",""name"":""org.apache.reef.bridge.message.Header"",""doc"":""Identifies
 the following message in the Java/C# bridge 
protocol."",""fields"":[{""name"":""identifier"",""doc"":""Identifier of the 
next message to be 
read."",""type"":""long""},{""name"":""className"",""doc"":""The fully 
qualified name of the message class."",""type"":""string""}]}";
-
-        /// <summary>
-        /// Gets the schema.
-        /// </summary>
-        public static string Schema
-        {
-            get
-            {
-                return JsonSchema;
-            }
-        }
-      
-        /// <summary>
-        /// Gets or sets the identifier field.
-        /// </summary>
-        [DataMember]
-        public long identifier { get; set; }
-              
-        /// <summary>
-        /// Gets or sets the className field.
-        /// </summary>
-        [DataMember]
-        public string className { get; set; }
-                
-        /// <summary>
-        /// Initializes a new instance of the <see cref="Header"/> class.
-        /// </summary>
-        public Header()
-        {
-        }
-
-        /// <summary>
-        /// Initializes a new instance of the <see cref="Header"/> class.
-        /// </summary>
-        /// <param name="identifier">The identifier.</param>
-        /// <param name="className">The className.</param>
-        public Header(long identifier, string className)
-        {
-            this.identifier = identifier;
-            this.className = className;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/reef/blob/643fe9b2/lang/cs/Org.Apache.REEF.Bridge.CLR/Org.Apache.REEF.Bridge.CLR.csproj
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Bridge.CLR/Org.Apache.REEF.Bridge.CLR.csproj 
b/lang/cs/Org.Apache.REEF.Bridge.CLR/Org.Apache.REEF.Bridge.CLR.csproj
index cc50a6f..753b088 100644
--- a/lang/cs/Org.Apache.REEF.Bridge.CLR/Org.Apache.REEF.Bridge.CLR.csproj
+++ b/lang/cs/Org.Apache.REEF.Bridge.CLR/Org.Apache.REEF.Bridge.CLR.csproj
@@ -54,7 +54,6 @@ under the License.
       <Link>Properties\SharedAssemblyInfo.cs</Link>
     </Compile>
     <Compile Include="Message\Acknowledgement.cs" />
-    <Compile Include="Message\Header.cs" />
     <Compile Include="Message\Protocol.cs" />
     <Compile Include="Message\SystemOnStart.cs" />
   </ItemGroup>
@@ -89,4 +88,4 @@ under the License.
   <Import Project="$(SolutionDir)\AvroCodeGeneration.Targets" 
Condition="Exists('$(SolutionDir)\AvroCodeGeneration.Targets')" />
   <Import Project="$(SolutionDir)\.nuget\NuGet.targets" 
Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" />
   <Import 
Project="$(PackagesDir)\StyleCop.MSBuild.$(StyleCopVersion)\build\StyleCop.MSBuild.Targets"
 
Condition="Exists('$(PackagesDir)\StyleCop.MSBuild.$(StyleCopVersion)\build\StyleCop.MSBuild.Targets')"
 />
-</Project>
+</Project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/643fe9b2/lang/cs/Org.Apache.REEF.Bridge.CLR/packages.config
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Bridge.CLR/packages.config 
b/lang/cs/Org.Apache.REEF.Bridge.CLR/packages.config
index a0941f8..9f55ae4 100644
--- a/lang/cs/Org.Apache.REEF.Bridge.CLR/packages.config
+++ b/lang/cs/Org.Apache.REEF.Bridge.CLR/packages.config
@@ -18,8 +18,6 @@ specific language governing permissions and limitations
 under the License.
 -->
 <packages>
-  <package id="Microsoft.Avro.Core" version="0.1.0" targetFramework="net451" 
developmentDependency="true" />
-  <package id="Microsoft.Avro.Tools" version="0.1.0" targetFramework="net451" 
developmentDependency="true" />
   <package id="Microsoft.Hadoop.Avro" version="1.5.6" targetFramework="net45" 
/>
   <package id="Newtonsoft.Json" version="8.0.3" targetFramework="net45" />
   <package id="StyleCop.MSBuild" version="4.7.49.1" targetFramework="net45" 
developmentDependency="true" />

http://git-wip-us.apache.org/repos/asf/reef/blob/643fe9b2/lang/cs/Org.Apache.REEF.Wake.Tests/Message/AvroTestMessage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/Message/AvroTestMessage.cs 
b/lang/cs/Org.Apache.REEF.Wake.Tests/Message/AvroTestMessage.cs
new file mode 100644
index 0000000..6da55bd
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/Message/AvroTestMessage.cs
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Runtime.Serialization;
+
+namespace org.apache.reef.wake.tests.message
+{
+    /// <summary>
+    /// Internal message header structure used to identify
+    /// the following message in the deserialization stream.
+    /// </summary>
+    [DataContract(Namespace = "org.apache.reef.wake.tests.message")]
+    public class AvroTestMessage
+    {
+        [DataMember]
+        public int number { get; set; }
+
+        [DataMember]
+        public string data { get; set; }
+
+        public AvroTestMessage()
+        {
+        }
+
+        public AvroTestMessage(int number, string data)
+        {
+            this.number = number;
+            this.data = data;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/643fe9b2/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj 
b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
index e9cb4c6..daf065d 100644
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
@@ -36,17 +36,24 @@ under the License.
   <ItemGroup>
     <Reference Include="System" />
     <Reference Include="System.Core" />
+    <Reference Include="System.Data" />
     <Reference Include="System.Reactive.Core">
       
<HintPath>$(PackagesDir)\System.Reactive.Core.$(SystemReactiveVersion)\lib\net45\System.Reactive.Core.dll</HintPath>
     </Reference>
     <Reference Include="System.Reactive.Interfaces">
       
<HintPath>$(PackagesDir)\System.Reactive.Interfaces.$(SystemReactiveVersion)\lib\net45\System.Reactive.Interfaces.dll</HintPath>
     </Reference>
+    <Reference Include="System.Runtime.Serialization" />
+    <Reference Include="Microsoft.Hadoop.Avro">
+      
<HintPath>$(PackagesDir)\Microsoft.Hadoop.Avro.$(AvroVersion)\lib\net45\Microsoft.Hadoop.Avro.dll</HintPath>
+    </Reference>
   </ItemGroup>
   <ItemGroup>
     <Compile Include="$(SolutionDir)\SharedAssemblyInfo.cs">
       <Link>Properties\SharedAssemblyInfo.cs</Link>
     </Compile>
+    <Compile Include="Message\AvroTestMessage.cs" />
+    <Compile Include="ProtocolSerializerTest.cs" />
     <Compile Include="ClockTest.cs" />
     <Compile Include="MultiCodecTest.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />

http://git-wip-us.apache.org/repos/asf/reef/blob/643fe9b2/lang/cs/Org.Apache.REEF.Wake.Tests/ProtocolSerializerTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/ProtocolSerializerTest.cs 
b/lang/cs/Org.Apache.REEF.Wake.Tests/ProtocolSerializerTest.cs
new file mode 100644
index 0000000..8f4a1ca
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/ProtocolSerializerTest.cs
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Concurrent;
+using System.Net;
+using System.Reactive;
+using Org.Apache.REEF.Wake.Avro;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using org.apache.reef.wake.tests.message;
+using Xunit;
+
+namespace Org.Apache.REEF.Wake.Tests
+{
+    /// <summary>
+    /// Observer to receive and verify test message contents.
+    /// </summary>
+    internal sealed class TestMessageObserver : 
IObserver<MessageInstance<AvroTestMessage>>
+    {
+        int number;
+        string data;
+
+        public TestMessageObserver(int number, string data)
+        {
+            this.number = number;
+            this.data = data;
+        }
+
+        public void OnNext(MessageInstance<AvroTestMessage> instance)
+        {
+            Assert.Equal(instance.message.number, this.number);
+            Assert.Equal(instance.message.data, this.data);
+        }
+
+        public void OnError(Exception error)
+        {
+            throw new NotImplementedException();
+        }
+
+        public void OnCompleted()
+        {
+            throw new NotImplementedException();
+        }
+    }
+
+    [Collection("FunctionalTests")]
+    public class TestProtocolSerializer
+    {
+        /// <summary>
+        /// Setup two way communication between two remote managers through 
the loopback
+        /// network and verify that Avro messages are properly serialized and 
deserialzied
+        /// by the ProtocolSerializer class.
+        /// </summary>
+        [Fact]
+        [Trait("Priority", "1")]
+        public void TestTwoWayCommunication()
+        {
+            // Test data.
+            int[] numbers = { 12, 25 };
+            string[] strings = { "The first string", "The second string" };
+
+            IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+            BlockingCollection<byte[]> queue1 = new 
BlockingCollection<byte[]>();
+            BlockingCollection<byte[]> queue2 = new 
BlockingCollection<byte[]>();
+
+            ProtocolSerializer serializer = new 
ProtocolSerializer(this.GetType().Assembly, 
"org.apache.reef.wake.tests.message");
+            IRemoteManagerFactory _remoteManagerFactory = 
TangFactory.GetTang().NewInjector().GetInstance<IRemoteManagerFactory>();
+
+            using (var remoteManager1 = 
_remoteManagerFactory.GetInstance(listeningAddress, new ByteCodec()))
+            using (var remoteManager2 = 
_remoteManagerFactory.GetInstance(listeningAddress, new ByteCodec()))
+            {
+                // Register observers for remote manager 1 and remote manager 2
+                var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
+                var observer1 = Observer.Create<byte[]>(queue1.Add);
+                var observer2 = Observer.Create<byte[]>(queue2.Add);
+                remoteManager1.RegisterObserver(remoteEndpoint, observer1);
+                remoteManager2.RegisterObserver(remoteEndpoint, observer2);
+
+                // Remote manager 1 sends avro message to remote manager 2
+                var remoteObserver1 = 
remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+                remoteObserver1.OnNext(serializer.Write(new 
AvroTestMessage(numbers[0], strings[0]), 1));
+
+                // Remote manager 2 sends avro message to remote manager 1
+                var remoteObserver2 = 
remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint);
+                remoteObserver2.OnNext(serializer.Write(new 
AvroTestMessage(numbers[1], strings[1]), 2));
+
+                // Verify the messages are properly received.
+                serializer.Read(queue1.Take(), new 
TestMessageObserver(numbers[1], strings[1]));
+                serializer.Read(queue2.Take(), new 
TestMessageObserver(numbers[0], strings[0]));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/643fe9b2/lang/cs/Org.Apache.REEF.Wake.Tests/packages.config
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/packages.config 
b/lang/cs/Org.Apache.REEF.Wake.Tests/packages.config
index 581efe6..e0d4dbf 100644
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/packages.config
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/packages.config
@@ -18,6 +18,7 @@ specific language governing permissions and limitations
 under the License.
 -->
 <packages>
+  <package id="Microsoft.Hadoop.Avro" version="1.5.6" targetFramework="net45" 
/>
   <package id="StyleCop.MSBuild" version="4.7.49.1" targetFramework="net45" 
developmentDependency="true" />
   <package id="System.Reactive.Core" version="3.1.1" targetFramework="net451" 
/>
   <package id="System.Reactive.Interfaces" version="3.1.1" 
targetFramework="net451" />

http://git-wip-us.apache.org/repos/asf/reef/blob/643fe9b2/lang/cs/Org.Apache.REEF.Wake/Avro/Message/Header.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Avro/Message/Header.cs 
b/lang/cs/Org.Apache.REEF.Wake/Avro/Message/Header.cs
new file mode 100644
index 0000000..318515b
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Avro/Message/Header.cs
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//<auto-generated />
+namespace org.apache.reef.wake.avro.message
+{
+    using System;
+    using System.Collections.Generic;
+    using System.Runtime.Serialization;
+    using Microsoft.Hadoop.Avro;
+
+    /// <summary>
+    /// Used to serialize and deserialize Avro record 
org.apache.reef.wake.avro.message.Header.
+    /// </summary>
+    [DataContract(Namespace = "org.apache.reef.wake.avro.message")]
+    public partial class Header
+    {
+        private const string JsonSchema = 
@"{""type"":""record"",""name"":""org.apache.reef.wake.avro.message.Header"",""doc"":""Identifies
 the following message in a given 
protocol."",""fields"":[{""name"":""sequence"",""doc"":""Sequence number of 
message."",""type"":""long""},{""name"":""className"",""doc"":""The name of the 
message class."",""type"":""string""}]}";
+
+        /// <summary>
+        /// Gets the schema.
+        /// </summary>
+        public static string Schema
+        {
+            get
+            {
+                return JsonSchema;
+            }
+        }
+      
+        /// <summary>
+        /// Gets or sets the sequence field.
+        /// </summary>
+        [DataMember]
+        public long sequence { get; set; }
+              
+        /// <summary>
+        /// Gets or sets the className field.
+        /// </summary>
+        [DataMember]
+        public string className { get; set; }
+                
+        /// <summary>
+        /// Initializes a new instance of the <see cref="Header"/> class.
+        /// </summary>
+        public Header()
+        {
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="Header"/> class.
+        /// </summary>
+        /// <param name="sequence">The sequence.</param>
+        /// <param name="className">The className.</param>
+        public Header(long sequence, string className)
+        {
+            this.sequence = sequence;
+            this.className = className;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/643fe9b2/lang/cs/Org.Apache.REEF.Wake/Avro/MessageInstance.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Avro/MessageInstance.cs 
b/lang/cs/Org.Apache.REEF.Wake/Avro/MessageInstance.cs
new file mode 100644
index 0000000..96ac8a4
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Avro/MessageInstance.cs
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License
+
+namespace Org.Apache.REEF.Wake.Avro
+{
+    /// <summary>
+    /// Wrapper class to bind a specific instance of an Avro messagage
+    /// with it associated sequence number.
+    /// </summary>
+    /// <typeparam name="T"></typeparam>
+    public struct MessageInstance<T>
+    {
+        public long sequence;
+        public T message;
+
+        public MessageInstance(long sequence, T message)
+        {
+            this.sequence = sequence;
+            this.message = message;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/643fe9b2/lang/cs/Org.Apache.REEF.Wake/Avro/ProtocolSerializer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Avro/ProtocolSerializer.cs 
b/lang/cs/Org.Apache.REEF.Wake/Avro/ProtocolSerializer.cs
new file mode 100644
index 0000000..dff30b1
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Avro/ProtocolSerializer.cs
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Reflection;
+using Microsoft.Hadoop.Avro;
+using Org.Apache.REEF.Utilities.Logging;
+using org.apache.reef.wake.avro.message;
+
+namespace Org.Apache.REEF.Wake.Avro
+{
+    /// <summary>
+    /// Given a namespace of Avro messages which represent a protocol, the 
ProtocolSerailizer
+    /// class reflects all of the message classes and builds Avro seriailziers 
and deserializers.
+    /// Avro messages are then serialized/deserilaized to and from byte[] 
arrays using the
+    /// Read/Write methods. A transport such as a RemoteObserver using a 
ByteCodec can then
+    /// be used to send and receive the serialized messages.
+    /// </summary>
+    public sealed class ProtocolSerializer
+    {
+        private static readonly Logger Logr = 
Logger.GetLogger(typeof(ProtocolSerializer));
+
+        // Delagates for message serializers and deserializers.
+        private delegate void Serialize(MemoryStream stream, object message);
+        private delegate void Deserialize(MemoryStream stream, object 
observer, long sequence);
+
+        // Message type to serialize/derserialize delagate.
+        private readonly SortedDictionary<string, Serialize> serializeMap = 
new SortedDictionary<string, Serialize>();
+        private readonly SortedDictionary<string, Deserialize> deserializeMap 
= new SortedDictionary<string, Deserialize>();
+
+        private readonly IAvroSerializer<Header> headerSerializer = 
AvroSerializer.Create<Header>();
+
+        /// <summary>
+        /// Register all of the protocol messages using reflection.
+        /// </summary>
+        /// <param name="assembly">The Assembley object which contains the 
namespace of the message classes.</param>
+        /// <param name="messageNamespace">A string which contains the 
namespace the protocol messages.</param>
+        public ProtocolSerializer(Assembly assembly, string messageNamespace)
+        {
+            MethodInfo registerInfo = 
typeof(ProtocolSerializer).GetMethod("Register", BindingFlags.Instance | 
BindingFlags.NonPublic);
+            MethodInfo genericInfo;
+
+            Logr.Log(Level.Info, "Retrieving types for assembly: {0}", 
assembly.FullName);
+            List<Type> types = new List<Type>(assembly.GetTypes());
+            types.Add(typeof(Header));
+
+            foreach (Type type in types)
+            {
+                string name = type.FullName;
+                if (name.StartsWith(messageNamespace))
+                {
+                    genericInfo = registerInfo.MakeGenericMethod(new[] { type 
});
+                    genericInfo.Invoke(this, null);
+                }
+            }
+        }
+
+        /// <summary>
+        /// Generate and store the metadata necessary to serialze and 
deserialize a specific message type.
+        /// </summary>
+        /// <typeparam name="TMessage">The class type of the message being 
registered.</typeparam>
+        internal void Register<TMessage>()
+        {
+            Logr.Log(Level.Info, "Registering message type: {0} {1}", 
typeof(TMessage).FullName, typeof(TMessage).Name);
+
+            IAvroSerializer<TMessage> messageSerializer = 
AvroSerializer.Create<TMessage>();
+            Serialize serialize = (MemoryStream stream, object message) =>
+            {
+                messageSerializer.Serialize(stream, (TMessage)message);
+            };
+            serializeMap.Add(typeof(TMessage).Name, serialize);
+
+            Deserialize deserialize = (MemoryStream stream, object observer, 
long sequence) =>
+            {
+                TMessage message = messageSerializer.Deserialize(stream);
+                IObserver<MessageInstance<TMessage>> msgObserver = observer as 
IObserver<MessageInstance<TMessage>>;
+                if (msgObserver != null)
+                {
+                    msgObserver.OnNext(new MessageInstance<TMessage>(sequence, 
message));
+                }
+                else
+                {
+                    Logr.Log(Level.Warning, "Unhandled message received: {0}", 
message);
+                }
+            };
+            deserializeMap.Add(typeof(TMessage).Name, deserialize);
+        }
+
+        /// <summary>
+        /// Serialize the input message and return a byte array.
+        /// </summary>
+        /// <param name="message">An object reference to the messeage to be 
serialized</param>
+        /// <param name="sequence">A long which cotains the higher level 
protocols sequence number for the message.</param>
+        /// <returns>A byte array containing the serialized header and 
message.</returns>
+        public byte[] Write(object message, long sequence) 
+        {
+            string name = message.GetType().Name;
+            Logr.Log(Level.Info, "Serializing message: {0}", name);
+            try
+            { 
+                using (MemoryStream stream = new MemoryStream())
+                {
+                    Header header = new Header(sequence, name);
+                    headerSerializer.Serialize(stream, header);
+
+                    Serialize serialize;
+                    if (serializeMap.TryGetValue(name, out serialize))
+                    {
+                        serialize(stream, message);
+                    }
+                    else
+                    {
+                        throw new SeializationException("Request to serialize 
unknown message type: " + name);
+                    }
+                    return stream.GetBuffer();
+                }
+            }
+            catch (Exception e)
+            {
+                Logr.Log(Level.Error, "Failure writing message.", e);
+                throw e;
+            }
+        }
+
+        /// <summary>
+        /// Read a message from the input byte array.
+        /// </summary>
+        /// <param name="data">The byte array containing a header message and 
message to be deserialized.</param>
+        /// <param name="observer">An object which implements the IObserver<> 
interface for the message being deserialized.</param>
+        public void Read(byte[] data, object observer)
+        {
+            try
+            {
+                using (MemoryStream stream = new MemoryStream(data))
+                {
+                    Header head = headerSerializer.Deserialize(stream);
+                    Deserialize deserialize;
+                    if (deserializeMap.TryGetValue(head.className, out 
deserialize))
+                    {
+                        deserialize(stream, observer, head.sequence);
+                    }
+                    else
+                    {
+                        throw new SeializationException("Request to 
deserialize unknown message type: " + head.className);
+                    }
+                }
+            }
+            catch (Exception e)
+            {
+                Logr.Log(Level.Error, "Failure reading message.", e);
+                throw e;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/643fe9b2/lang/cs/Org.Apache.REEF.Wake/Avro/SeializationException.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Avro/SeializationException.cs 
b/lang/cs/Org.Apache.REEF.Wake/Avro/SeializationException.cs
new file mode 100644
index 0000000..4ecc4d9
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Avro/SeializationException.cs
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License
+
+using System;
+
+namespace Org.Apache.REEF.Wake.Avro
+{
+    /// <summary>
+    /// The SeializationException is generated when an Avro serializer is
+    /// requested to send or receive a message that does not exist is in
+    /// message namespace it was given at construction time.
+    /// </summary>
+    class SeializationException : Exception
+    {
+        public SeializationException(string message) : base(message)
+        {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/643fe9b2/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj 
b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
index a380400..9dcc039 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
+++ b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
@@ -27,9 +27,17 @@ under the License.
     <FileAlignment>512</FileAlignment>
     <RestorePackages>true</RestorePackages>
     <SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) == 
'*Undefined*'">..</SolutionDir>
+    <AvroBinaryDirectory>..\packages\AvroBin</AvroBinaryDirectory>
+    <AvroSchemaDirectory>..\..\common\wake\avro</AvroSchemaDirectory>
+    
<AvroTools>..\packages\Microsoft.Avro.Tools.0.1.0\lib\net451\Microsoft.Avro.Tools.exe</AvroTools>
+    
<AvroLibrary>..\packages\Microsoft.Avro.Core.0.1.0\lib\net451\Microsoft.Avro.Core.dll</AvroLibrary>
+    
<NewtonsoftLibrary>..\packages\Newtonsoft.Json.10.0.3\lib\net45\Newtonsoft.Json.dll</NewtonsoftLibrary>
   </PropertyGroup>
   <Import Project="$(SolutionDir)\build.props" />
   <ItemGroup>
+    <Reference Include="Microsoft.Hadoop.Avro">
+      
<HintPath>$(PackagesDir)\Microsoft.Hadoop.Avro.$(AvroVersion)\lib\net45\Microsoft.Hadoop.Avro.dll</HintPath>
+    </Reference>
     <Reference Include="Microsoft.Practices.TransientFaultHandling.Core, 
Version=5.1.1209.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, 
processorArchitecture=MSIL">
       
<HintPath>$(PackagesDir)\TransientFaultHandling.Core.5.1.1209.1\lib\NET4\Microsoft.Practices.TransientFaultHandling.Core.dll</HintPath>
     </Reference>
@@ -38,6 +46,7 @@ under the License.
     </Reference>
     <Reference Include="System" />
     <Reference Include="System.Core" />
+    <Reference Include="System.Data" />
     <Reference Include="System.Reactive.Core">
       
<HintPath>$(PackagesDir)\System.Reactive.Core.$(SystemReactiveVersion)\lib\net45\System.Reactive.Core.dll</HintPath>
       <Private>True</Private>
@@ -45,6 +54,7 @@ under the License.
     <Reference Include="System.Reactive.Interfaces">
       
<HintPath>$(PackagesDir)\System.Reactive.Interfaces.$(SystemReactiveVersion)\lib\net45\System.Reactive.Interfaces.dll</HintPath>
     </Reference>
+    <Reference Include="System.Runtime.Serialization" />
     <Reference Include="System.Xml" />
   </ItemGroup>
   <ItemGroup>
@@ -52,6 +62,10 @@ under the License.
       <Link>Properties\SharedAssemblyInfo.cs</Link>
     </Compile>
     <Compile Include="AbstractEStage.cs" />
+    <Compile Include="Avro\MessageInstance.cs" />
+    <Compile Include="Avro\Message\Header.cs" />
+    <Compile Include="Avro\ProtocolSerializer.cs" />
+    <Compile Include="Avro\SeializationException.cs" />
     <Compile Include="Examples\P2p\IEventSource.cs" />
     <Compile Include="Examples\P2p\Pull2Push.cs" />
     <Compile Include="IEStage.cs" />
@@ -205,5 +219,6 @@ under the License.
   </ItemGroup>
   <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
   <Import Project="$(SolutionDir)\.nuget\NuGet.targets" 
Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" />
+  <Import Project="$(SolutionDir)\AvroCodeGeneration.Targets" 
Condition="Exists('$(SolutionDir)\AvroCodeGeneration.Targets')" />
   <Import 
Project="$(PackagesDir)\StyleCop.MSBuild.$(StyleCopVersion)\build\StyleCop.MSBuild.Targets"
 
Condition="Exists('$(PackagesDir)\StyleCop.MSBuild.$(StyleCopVersion)\build\StyleCop.MSBuild.Targets')"
 />
 </Project>

http://git-wip-us.apache.org/repos/asf/reef/blob/643fe9b2/lang/cs/Org.Apache.REEF.Wake/packages.config
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/packages.config 
b/lang/cs/Org.Apache.REEF.Wake/packages.config
index e176d57..beb1da1 100644
--- a/lang/cs/Org.Apache.REEF.Wake/packages.config
+++ b/lang/cs/Org.Apache.REEF.Wake/packages.config
@@ -18,6 +18,10 @@ specific language governing permissions and limitations
 under the License.
 -->
 <packages>
+  <package id="Microsoft.Avro.Core" version="0.1.0" targetFramework="net451" 
developmentDependency="true" />
+  <package id="Microsoft.Avro.Tools" version="0.1.0" targetFramework="net451" 
developmentDependency="true" />
+  <package id="Newtonsoft.Json" version="10.0.3" targetFramework="net451" 
developmentDependency="true" />
+  <package id="Microsoft.Hadoop.Avro" version="1.5.6" targetFramework="net45" 
/>
   <package id="protobuf-net" version="2.0.0.668" targetFramework="net45" />
   <package id="StyleCop.MSBuild" version="4.7.49.1" targetFramework="net45" 
developmentDependency="true" />
   <package id="System.Reactive.Core" version="3.1.1" targetFramework="net451" 
/>

http://git-wip-us.apache.org/repos/asf/reef/blob/643fe9b2/lang/cs/nuget.config
----------------------------------------------------------------------
diff --git a/lang/cs/nuget.config b/lang/cs/nuget.config
index ddd79f7..79317f3 100644
--- a/lang/cs/nuget.config
+++ b/lang/cs/nuget.config
@@ -19,6 +19,7 @@ under the License.
 -->
 <configuration>
     <packageSources>
+        <add key="nuget.org" value="https://www.nuget.org/api/V2"; />
         <add key="dotnet" 
value="https://dotnet.myget.org/F/dotnet-core/api/v3/index.json"; />
     </packageSources>
 </configuration>

Reply via email to