Repository: reef
Updated Branches:
refs/heads/master efaee7311 -> 643fe9b21
[REEF-1813] Add Avro message protocol serializer
* Added a ProtocolSerializer class to REEF.Wake that automatically
constructs the searializers and callback mechanism for all
of the Avro messages defined in a given C# namespace.
* Added a test case to wake tests that verify the ProtocolSerializer
properly serializes and deserializes Avro messages going through
a two RemoteManager connection.
* Moved header message from bridge clr to wake where serializer lives.
JIRA:
[REEF-1813](https://issues.apache.org/jira/browse/REEF-1813)
Pull request:
This closes #1321
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/643fe9b2
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/643fe9b2
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/643fe9b2
Branch: refs/heads/master
Commit: 643fe9b21c6adc14dee3abf1fdaa0640c30941d4
Parents: efaee73
Author: Doug Service <[email protected]>
Authored: Fri Jun 30 21:16:14 2017 +0000
Committer: Sergiy Matusevych <[email protected]>
Committed: Tue Jul 18 19:47:39 2017 -0700
----------------------------------------------------------------------
lang/common/wake/avro/Header.avsc | 41 +++++
lang/cs/AvroCodeGeneration.targets | 6 +-
.../Message/Header.cs | 74 --------
.../Org.Apache.REEF.Bridge.CLR.csproj | 3 +-
.../Org.Apache.REEF.Bridge.CLR/packages.config | 2 -
.../Message/AvroTestMessage.cs | 45 +++++
.../Org.Apache.REEF.Wake.Tests.csproj | 7 +
.../ProtocolSerializerTest.cs | 109 ++++++++++++
.../Org.Apache.REEF.Wake.Tests/packages.config | 1 +
.../Org.Apache.REEF.Wake/Avro/Message/Header.cs | 74 ++++++++
.../Avro/MessageInstance.cs | 36 ++++
.../Avro/ProtocolSerializer.cs | 171 +++++++++++++++++++
.../Avro/SeializationException.cs | 33 ++++
.../Org.Apache.REEF.Wake.csproj | 15 ++
lang/cs/Org.Apache.REEF.Wake/packages.config | 4 +
lang/cs/nuget.config | 1 +
16 files changed, 541 insertions(+), 81 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/643fe9b2/lang/common/wake/avro/Header.avsc
----------------------------------------------------------------------
diff --git a/lang/common/wake/avro/Header.avsc
b/lang/common/wake/avro/Header.avsc
new file mode 100644
index 0000000..8efd4f5
--- /dev/null
+++ b/lang/common/wake/avro/Header.avsc
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+[
+ /*
+ * Identify the next message in the Java/C# bridge protocol.
+ */
+ {
+ "namespace":"org.apache.reef.wake.avro.message",
+ "type":"record",
+ "name":"Header",
+ "doc":"Identifies the following message in a given protocol.",
+ "fields":[
+ {
+ "name":"sequence",
+ "doc":"Sequence number of message.",
+ "type":"long"
+ },
+ {
+ "name":"className",
+ "doc":"The name of the message class.",
+ "type":"string"
+ }
+ ]
+ }
+]
http://git-wip-us.apache.org/repos/asf/reef/blob/643fe9b2/lang/cs/AvroCodeGeneration.targets
----------------------------------------------------------------------
diff --git a/lang/cs/AvroCodeGeneration.targets
b/lang/cs/AvroCodeGeneration.targets
index 48a6695..c6599b2 100644
--- a/lang/cs/AvroCodeGeneration.targets
+++ b/lang/cs/AvroCodeGeneration.targets
@@ -131,9 +131,9 @@ under the License.
</UsingTask>
<Target Name="SetupAvroCodeGen" DependsOnTargets="RestorePackages">
<Message Text="Copying Avro code generation files to
$(AvroBinaryDirectory)..." />
- <Copy SourceFiles="$(AvroTools)"
DestinationFolder="$(AvroBinaryDirectory)" />
- <Copy SourceFiles="$(AvroLibrary)"
DestinationFolder="$(AvroBinaryDirectory)" />
- <Copy SourceFiles="$(NewtonsoftLibrary)"
DestinationFolder="$(AvroBinaryDirectory)" />
+ <Copy SourceFiles="$(AvroTools)"
DestinationFolder="$(AvroBinaryDirectory)" SkipUnchangedFiles="true" />
+ <Copy SourceFiles="$(AvroLibrary)"
DestinationFolder="$(AvroBinaryDirectory)" SkipUnchangedFiles="true" />
+ <Copy SourceFiles="$(NewtonsoftLibrary)"
DestinationFolder="$(AvroBinaryDirectory)" SkipUnchangedFiles="true" />
</Target>
<Target Name="CodeGeneration" DependsOnTargets="SetupAvroCodeGen"
BeforeTargets="CoreCompile">
<Message Text="Generating C# classes from Avro avsc files @(Compile)..." />
http://git-wip-us.apache.org/repos/asf/reef/blob/643fe9b2/lang/cs/Org.Apache.REEF.Bridge.CLR/Message/Header.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Bridge.CLR/Message/Header.cs
b/lang/cs/Org.Apache.REEF.Bridge.CLR/Message/Header.cs
deleted file mode 100644
index cf0b939..0000000
--- a/lang/cs/Org.Apache.REEF.Bridge.CLR/Message/Header.cs
+++ /dev/null
@@ -1,74 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-//<auto-generated />
-namespace org.apache.reef.bridge.message
-{
- using System;
- using System.Collections.Generic;
- using System.Runtime.Serialization;
- using Microsoft.Hadoop.Avro;
-
- /// <summary>
- /// Used to serialize and deserialize Avro record
org.apache.reef.bridge.message.Header.
- /// </summary>
- [DataContract(Namespace = "org.apache.reef.bridge.message")]
- public partial class Header
- {
- private const string JsonSchema =
@"{""type"":""record"",""name"":""org.apache.reef.bridge.message.Header"",""doc"":""Identifies
the following message in the Java/C# bridge
protocol."",""fields"":[{""name"":""identifier"",""doc"":""Identifier of the
next message to be
read."",""type"":""long""},{""name"":""className"",""doc"":""The fully
qualified name of the message class."",""type"":""string""}]}";
-
- /// <summary>
- /// Gets the schema.
- /// </summary>
- public static string Schema
- {
- get
- {
- return JsonSchema;
- }
- }
-
- /// <summary>
- /// Gets or sets the identifier field.
- /// </summary>
- [DataMember]
- public long identifier { get; set; }
-
- /// <summary>
- /// Gets or sets the className field.
- /// </summary>
- [DataMember]
- public string className { get; set; }
-
- /// <summary>
- /// Initializes a new instance of the <see cref="Header"/> class.
- /// </summary>
- public Header()
- {
- }
-
- /// <summary>
- /// Initializes a new instance of the <see cref="Header"/> class.
- /// </summary>
- /// <param name="identifier">The identifier.</param>
- /// <param name="className">The className.</param>
- public Header(long identifier, string className)
- {
- this.identifier = identifier;
- this.className = className;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/643fe9b2/lang/cs/Org.Apache.REEF.Bridge.CLR/Org.Apache.REEF.Bridge.CLR.csproj
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.Bridge.CLR/Org.Apache.REEF.Bridge.CLR.csproj
b/lang/cs/Org.Apache.REEF.Bridge.CLR/Org.Apache.REEF.Bridge.CLR.csproj
index cc50a6f..753b088 100644
--- a/lang/cs/Org.Apache.REEF.Bridge.CLR/Org.Apache.REEF.Bridge.CLR.csproj
+++ b/lang/cs/Org.Apache.REEF.Bridge.CLR/Org.Apache.REEF.Bridge.CLR.csproj
@@ -54,7 +54,6 @@ under the License.
<Link>Properties\SharedAssemblyInfo.cs</Link>
</Compile>
<Compile Include="Message\Acknowledgement.cs" />
- <Compile Include="Message\Header.cs" />
<Compile Include="Message\Protocol.cs" />
<Compile Include="Message\SystemOnStart.cs" />
</ItemGroup>
@@ -89,4 +88,4 @@ under the License.
<Import Project="$(SolutionDir)\AvroCodeGeneration.Targets"
Condition="Exists('$(SolutionDir)\AvroCodeGeneration.Targets')" />
<Import Project="$(SolutionDir)\.nuget\NuGet.targets"
Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" />
<Import
Project="$(PackagesDir)\StyleCop.MSBuild.$(StyleCopVersion)\build\StyleCop.MSBuild.Targets"
Condition="Exists('$(PackagesDir)\StyleCop.MSBuild.$(StyleCopVersion)\build\StyleCop.MSBuild.Targets')"
/>
-</Project>
+</Project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/reef/blob/643fe9b2/lang/cs/Org.Apache.REEF.Bridge.CLR/packages.config
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Bridge.CLR/packages.config
b/lang/cs/Org.Apache.REEF.Bridge.CLR/packages.config
index a0941f8..9f55ae4 100644
--- a/lang/cs/Org.Apache.REEF.Bridge.CLR/packages.config
+++ b/lang/cs/Org.Apache.REEF.Bridge.CLR/packages.config
@@ -18,8 +18,6 @@ specific language governing permissions and limitations
under the License.
-->
<packages>
- <package id="Microsoft.Avro.Core" version="0.1.0" targetFramework="net451"
developmentDependency="true" />
- <package id="Microsoft.Avro.Tools" version="0.1.0" targetFramework="net451"
developmentDependency="true" />
<package id="Microsoft.Hadoop.Avro" version="1.5.6" targetFramework="net45"
/>
<package id="Newtonsoft.Json" version="8.0.3" targetFramework="net45" />
<package id="StyleCop.MSBuild" version="4.7.49.1" targetFramework="net45"
developmentDependency="true" />
http://git-wip-us.apache.org/repos/asf/reef/blob/643fe9b2/lang/cs/Org.Apache.REEF.Wake.Tests/Message/AvroTestMessage.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/Message/AvroTestMessage.cs
b/lang/cs/Org.Apache.REEF.Wake.Tests/Message/AvroTestMessage.cs
new file mode 100644
index 0000000..6da55bd
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/Message/AvroTestMessage.cs
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Runtime.Serialization;
+
+namespace org.apache.reef.wake.tests.message
+{
+ /// <summary>
+ /// Internal message header structure used to identify
+ /// the following message in the deserialization stream.
+ /// </summary>
+ [DataContract(Namespace = "org.apache.reef.wake.tests.message")]
+ public class AvroTestMessage
+ {
+ [DataMember]
+ public int number { get; set; }
+
+ [DataMember]
+ public string data { get; set; }
+
+ public AvroTestMessage()
+ {
+ }
+
+ public AvroTestMessage(int number, string data)
+ {
+ this.number = number;
+ this.data = data;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/643fe9b2/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
index e9cb4c6..daf065d 100644
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/Org.Apache.REEF.Wake.Tests.csproj
@@ -36,17 +36,24 @@ under the License.
<ItemGroup>
<Reference Include="System" />
<Reference Include="System.Core" />
+ <Reference Include="System.Data" />
<Reference Include="System.Reactive.Core">
<HintPath>$(PackagesDir)\System.Reactive.Core.$(SystemReactiveVersion)\lib\net45\System.Reactive.Core.dll</HintPath>
</Reference>
<Reference Include="System.Reactive.Interfaces">
<HintPath>$(PackagesDir)\System.Reactive.Interfaces.$(SystemReactiveVersion)\lib\net45\System.Reactive.Interfaces.dll</HintPath>
</Reference>
+ <Reference Include="System.Runtime.Serialization" />
+ <Reference Include="Microsoft.Hadoop.Avro">
+
<HintPath>$(PackagesDir)\Microsoft.Hadoop.Avro.$(AvroVersion)\lib\net45\Microsoft.Hadoop.Avro.dll</HintPath>
+ </Reference>
</ItemGroup>
<ItemGroup>
<Compile Include="$(SolutionDir)\SharedAssemblyInfo.cs">
<Link>Properties\SharedAssemblyInfo.cs</Link>
</Compile>
+ <Compile Include="Message\AvroTestMessage.cs" />
+ <Compile Include="ProtocolSerializerTest.cs" />
<Compile Include="ClockTest.cs" />
<Compile Include="MultiCodecTest.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
http://git-wip-us.apache.org/repos/asf/reef/blob/643fe9b2/lang/cs/Org.Apache.REEF.Wake.Tests/ProtocolSerializerTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/ProtocolSerializerTest.cs
b/lang/cs/Org.Apache.REEF.Wake.Tests/ProtocolSerializerTest.cs
new file mode 100644
index 0000000..8f4a1ca
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/ProtocolSerializerTest.cs
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Concurrent;
+using System.Net;
+using System.Reactive;
+using Org.Apache.REEF.Wake.Avro;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Wake.Remote.Impl;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using org.apache.reef.wake.tests.message;
+using Xunit;
+
+namespace Org.Apache.REEF.Wake.Tests
+{
+ /// <summary>
+ /// Observer to receive and verify test message contents.
+ /// </summary>
+ internal sealed class TestMessageObserver :
IObserver<MessageInstance<AvroTestMessage>>
+ {
+ int number;
+ string data;
+
+ public TestMessageObserver(int number, string data)
+ {
+ this.number = number;
+ this.data = data;
+ }
+
+ public void OnNext(MessageInstance<AvroTestMessage> instance)
+ {
+ Assert.Equal(instance.message.number, this.number);
+ Assert.Equal(instance.message.data, this.data);
+ }
+
+ public void OnError(Exception error)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void OnCompleted()
+ {
+ throw new NotImplementedException();
+ }
+ }
+
+ [Collection("FunctionalTests")]
+ public class TestProtocolSerializer
+ {
+ /// <summary>
+ /// Setup two way communication between two remote managers through
the loopback
+ /// network and verify that Avro messages are properly serialized and
deserialzied
+ /// by the ProtocolSerializer class.
+ /// </summary>
+ [Fact]
+ [Trait("Priority", "1")]
+ public void TestTwoWayCommunication()
+ {
+ // Test data.
+ int[] numbers = { 12, 25 };
+ string[] strings = { "The first string", "The second string" };
+
+ IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
+ BlockingCollection<byte[]> queue1 = new
BlockingCollection<byte[]>();
+ BlockingCollection<byte[]> queue2 = new
BlockingCollection<byte[]>();
+
+ ProtocolSerializer serializer = new
ProtocolSerializer(this.GetType().Assembly,
"org.apache.reef.wake.tests.message");
+ IRemoteManagerFactory _remoteManagerFactory =
TangFactory.GetTang().NewInjector().GetInstance<IRemoteManagerFactory>();
+
+ using (var remoteManager1 =
_remoteManagerFactory.GetInstance(listeningAddress, new ByteCodec()))
+ using (var remoteManager2 =
_remoteManagerFactory.GetInstance(listeningAddress, new ByteCodec()))
+ {
+ // Register observers for remote manager 1 and remote manager 2
+ var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
+ var observer1 = Observer.Create<byte[]>(queue1.Add);
+ var observer2 = Observer.Create<byte[]>(queue2.Add);
+ remoteManager1.RegisterObserver(remoteEndpoint, observer1);
+ remoteManager2.RegisterObserver(remoteEndpoint, observer2);
+
+ // Remote manager 1 sends avro message to remote manager 2
+ var remoteObserver1 =
remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
+ remoteObserver1.OnNext(serializer.Write(new
AvroTestMessage(numbers[0], strings[0]), 1));
+
+ // Remote manager 2 sends avro message to remote manager 1
+ var remoteObserver2 =
remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint);
+ remoteObserver2.OnNext(serializer.Write(new
AvroTestMessage(numbers[1], strings[1]), 2));
+
+ // Verify the messages are properly received.
+ serializer.Read(queue1.Take(), new
TestMessageObserver(numbers[1], strings[1]));
+ serializer.Read(queue2.Take(), new
TestMessageObserver(numbers[0], strings[0]));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/643fe9b2/lang/cs/Org.Apache.REEF.Wake.Tests/packages.config
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/packages.config
b/lang/cs/Org.Apache.REEF.Wake.Tests/packages.config
index 581efe6..e0d4dbf 100644
--- a/lang/cs/Org.Apache.REEF.Wake.Tests/packages.config
+++ b/lang/cs/Org.Apache.REEF.Wake.Tests/packages.config
@@ -18,6 +18,7 @@ specific language governing permissions and limitations
under the License.
-->
<packages>
+ <package id="Microsoft.Hadoop.Avro" version="1.5.6" targetFramework="net45"
/>
<package id="StyleCop.MSBuild" version="4.7.49.1" targetFramework="net45"
developmentDependency="true" />
<package id="System.Reactive.Core" version="3.1.1" targetFramework="net451"
/>
<package id="System.Reactive.Interfaces" version="3.1.1"
targetFramework="net451" />
http://git-wip-us.apache.org/repos/asf/reef/blob/643fe9b2/lang/cs/Org.Apache.REEF.Wake/Avro/Message/Header.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Avro/Message/Header.cs
b/lang/cs/Org.Apache.REEF.Wake/Avro/Message/Header.cs
new file mode 100644
index 0000000..318515b
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Avro/Message/Header.cs
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//<auto-generated />
+namespace org.apache.reef.wake.avro.message
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Runtime.Serialization;
+ using Microsoft.Hadoop.Avro;
+
+ /// <summary>
+ /// Used to serialize and deserialize Avro record
org.apache.reef.wake.avro.message.Header.
+ /// </summary>
+ [DataContract(Namespace = "org.apache.reef.wake.avro.message")]
+ public partial class Header
+ {
+ private const string JsonSchema =
@"{""type"":""record"",""name"":""org.apache.reef.wake.avro.message.Header"",""doc"":""Identifies
the following message in a given
protocol."",""fields"":[{""name"":""sequence"",""doc"":""Sequence number of
message."",""type"":""long""},{""name"":""className"",""doc"":""The name of the
message class."",""type"":""string""}]}";
+
+ /// <summary>
+ /// Gets the schema.
+ /// </summary>
+ public static string Schema
+ {
+ get
+ {
+ return JsonSchema;
+ }
+ }
+
+ /// <summary>
+ /// Gets or sets the sequence field.
+ /// </summary>
+ [DataMember]
+ public long sequence { get; set; }
+
+ /// <summary>
+ /// Gets or sets the className field.
+ /// </summary>
+ [DataMember]
+ public string className { get; set; }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="Header"/> class.
+ /// </summary>
+ public Header()
+ {
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="Header"/> class.
+ /// </summary>
+ /// <param name="sequence">The sequence.</param>
+ /// <param name="className">The className.</param>
+ public Header(long sequence, string className)
+ {
+ this.sequence = sequence;
+ this.className = className;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/643fe9b2/lang/cs/Org.Apache.REEF.Wake/Avro/MessageInstance.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Avro/MessageInstance.cs
b/lang/cs/Org.Apache.REEF.Wake/Avro/MessageInstance.cs
new file mode 100644
index 0000000..96ac8a4
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Avro/MessageInstance.cs
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License
+
+namespace Org.Apache.REEF.Wake.Avro
+{
+ /// <summary>
+ /// Wrapper class to bind a specific instance of an Avro messagage
+ /// with it associated sequence number.
+ /// </summary>
+ /// <typeparam name="T"></typeparam>
+ public struct MessageInstance<T>
+ {
+ public long sequence;
+ public T message;
+
+ public MessageInstance(long sequence, T message)
+ {
+ this.sequence = sequence;
+ this.message = message;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/643fe9b2/lang/cs/Org.Apache.REEF.Wake/Avro/ProtocolSerializer.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Avro/ProtocolSerializer.cs
b/lang/cs/Org.Apache.REEF.Wake/Avro/ProtocolSerializer.cs
new file mode 100644
index 0000000..dff30b1
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Avro/ProtocolSerializer.cs
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Reflection;
+using Microsoft.Hadoop.Avro;
+using Org.Apache.REEF.Utilities.Logging;
+using org.apache.reef.wake.avro.message;
+
+namespace Org.Apache.REEF.Wake.Avro
+{
+ /// <summary>
+ /// Given a namespace of Avro messages which represent a protocol, the
ProtocolSerailizer
+ /// class reflects all of the message classes and builds Avro seriailziers
and deserializers.
+ /// Avro messages are then serialized/deserilaized to and from byte[]
arrays using the
+ /// Read/Write methods. A transport such as a RemoteObserver using a
ByteCodec can then
+ /// be used to send and receive the serialized messages.
+ /// </summary>
+ public sealed class ProtocolSerializer
+ {
+ private static readonly Logger Logr =
Logger.GetLogger(typeof(ProtocolSerializer));
+
+ // Delagates for message serializers and deserializers.
+ private delegate void Serialize(MemoryStream stream, object message);
+ private delegate void Deserialize(MemoryStream stream, object
observer, long sequence);
+
+ // Message type to serialize/derserialize delagate.
+ private readonly SortedDictionary<string, Serialize> serializeMap =
new SortedDictionary<string, Serialize>();
+ private readonly SortedDictionary<string, Deserialize> deserializeMap
= new SortedDictionary<string, Deserialize>();
+
+ private readonly IAvroSerializer<Header> headerSerializer =
AvroSerializer.Create<Header>();
+
+ /// <summary>
+ /// Register all of the protocol messages using reflection.
+ /// </summary>
+ /// <param name="assembly">The Assembley object which contains the
namespace of the message classes.</param>
+ /// <param name="messageNamespace">A string which contains the
namespace the protocol messages.</param>
+ public ProtocolSerializer(Assembly assembly, string messageNamespace)
+ {
+ MethodInfo registerInfo =
typeof(ProtocolSerializer).GetMethod("Register", BindingFlags.Instance |
BindingFlags.NonPublic);
+ MethodInfo genericInfo;
+
+ Logr.Log(Level.Info, "Retrieving types for assembly: {0}",
assembly.FullName);
+ List<Type> types = new List<Type>(assembly.GetTypes());
+ types.Add(typeof(Header));
+
+ foreach (Type type in types)
+ {
+ string name = type.FullName;
+ if (name.StartsWith(messageNamespace))
+ {
+ genericInfo = registerInfo.MakeGenericMethod(new[] { type
});
+ genericInfo.Invoke(this, null);
+ }
+ }
+ }
+
+ /// <summary>
+ /// Generate and store the metadata necessary to serialze and
deserialize a specific message type.
+ /// </summary>
+ /// <typeparam name="TMessage">The class type of the message being
registered.</typeparam>
+ internal void Register<TMessage>()
+ {
+ Logr.Log(Level.Info, "Registering message type: {0} {1}",
typeof(TMessage).FullName, typeof(TMessage).Name);
+
+ IAvroSerializer<TMessage> messageSerializer =
AvroSerializer.Create<TMessage>();
+ Serialize serialize = (MemoryStream stream, object message) =>
+ {
+ messageSerializer.Serialize(stream, (TMessage)message);
+ };
+ serializeMap.Add(typeof(TMessage).Name, serialize);
+
+ Deserialize deserialize = (MemoryStream stream, object observer,
long sequence) =>
+ {
+ TMessage message = messageSerializer.Deserialize(stream);
+ IObserver<MessageInstance<TMessage>> msgObserver = observer as
IObserver<MessageInstance<TMessage>>;
+ if (msgObserver != null)
+ {
+ msgObserver.OnNext(new MessageInstance<TMessage>(sequence,
message));
+ }
+ else
+ {
+ Logr.Log(Level.Warning, "Unhandled message received: {0}",
message);
+ }
+ };
+ deserializeMap.Add(typeof(TMessage).Name, deserialize);
+ }
+
+ /// <summary>
+ /// Serialize the input message and return a byte array.
+ /// </summary>
+ /// <param name="message">An object reference to the messeage to be
serialized</param>
+ /// <param name="sequence">A long which cotains the higher level
protocols sequence number for the message.</param>
+ /// <returns>A byte array containing the serialized header and
message.</returns>
+ public byte[] Write(object message, long sequence)
+ {
+ string name = message.GetType().Name;
+ Logr.Log(Level.Info, "Serializing message: {0}", name);
+ try
+ {
+ using (MemoryStream stream = new MemoryStream())
+ {
+ Header header = new Header(sequence, name);
+ headerSerializer.Serialize(stream, header);
+
+ Serialize serialize;
+ if (serializeMap.TryGetValue(name, out serialize))
+ {
+ serialize(stream, message);
+ }
+ else
+ {
+ throw new SeializationException("Request to serialize
unknown message type: " + name);
+ }
+ return stream.GetBuffer();
+ }
+ }
+ catch (Exception e)
+ {
+ Logr.Log(Level.Error, "Failure writing message.", e);
+ throw e;
+ }
+ }
+
+ /// <summary>
+ /// Read a message from the input byte array.
+ /// </summary>
+ /// <param name="data">The byte array containing a header message and
message to be deserialized.</param>
+ /// <param name="observer">An object which implements the IObserver<>
interface for the message being deserialized.</param>
+ public void Read(byte[] data, object observer)
+ {
+ try
+ {
+ using (MemoryStream stream = new MemoryStream(data))
+ {
+ Header head = headerSerializer.Deserialize(stream);
+ Deserialize deserialize;
+ if (deserializeMap.TryGetValue(head.className, out
deserialize))
+ {
+ deserialize(stream, observer, head.sequence);
+ }
+ else
+ {
+ throw new SeializationException("Request to
deserialize unknown message type: " + head.className);
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ Logr.Log(Level.Error, "Failure reading message.", e);
+ throw e;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/643fe9b2/lang/cs/Org.Apache.REEF.Wake/Avro/SeializationException.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Avro/SeializationException.cs
b/lang/cs/Org.Apache.REEF.Wake/Avro/SeializationException.cs
new file mode 100644
index 0000000..4ecc4d9
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Wake/Avro/SeializationException.cs
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License
+
+using System;
+
+namespace Org.Apache.REEF.Wake.Avro
+{
+ /// <summary>
+ /// The SeializationException is generated when an Avro serializer is
+ /// requested to send or receive a message that does not exist is in
+ /// message namespace it was given at construction time.
+ /// </summary>
+ class SeializationException : Exception
+ {
+ public SeializationException(string message) : base(message)
+ {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/643fe9b2/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
index a380400..9dcc039 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
+++ b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj
@@ -27,9 +27,17 @@ under the License.
<FileAlignment>512</FileAlignment>
<RestorePackages>true</RestorePackages>
<SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) ==
'*Undefined*'">..</SolutionDir>
+ <AvroBinaryDirectory>..\packages\AvroBin</AvroBinaryDirectory>
+ <AvroSchemaDirectory>..\..\common\wake\avro</AvroSchemaDirectory>
+
<AvroTools>..\packages\Microsoft.Avro.Tools.0.1.0\lib\net451\Microsoft.Avro.Tools.exe</AvroTools>
+
<AvroLibrary>..\packages\Microsoft.Avro.Core.0.1.0\lib\net451\Microsoft.Avro.Core.dll</AvroLibrary>
+
<NewtonsoftLibrary>..\packages\Newtonsoft.Json.10.0.3\lib\net45\Newtonsoft.Json.dll</NewtonsoftLibrary>
</PropertyGroup>
<Import Project="$(SolutionDir)\build.props" />
<ItemGroup>
+ <Reference Include="Microsoft.Hadoop.Avro">
+
<HintPath>$(PackagesDir)\Microsoft.Hadoop.Avro.$(AvroVersion)\lib\net45\Microsoft.Hadoop.Avro.dll</HintPath>
+ </Reference>
<Reference Include="Microsoft.Practices.TransientFaultHandling.Core,
Version=5.1.1209.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35,
processorArchitecture=MSIL">
<HintPath>$(PackagesDir)\TransientFaultHandling.Core.5.1.1209.1\lib\NET4\Microsoft.Practices.TransientFaultHandling.Core.dll</HintPath>
</Reference>
@@ -38,6 +46,7 @@ under the License.
</Reference>
<Reference Include="System" />
<Reference Include="System.Core" />
+ <Reference Include="System.Data" />
<Reference Include="System.Reactive.Core">
<HintPath>$(PackagesDir)\System.Reactive.Core.$(SystemReactiveVersion)\lib\net45\System.Reactive.Core.dll</HintPath>
<Private>True</Private>
@@ -45,6 +54,7 @@ under the License.
<Reference Include="System.Reactive.Interfaces">
<HintPath>$(PackagesDir)\System.Reactive.Interfaces.$(SystemReactiveVersion)\lib\net45\System.Reactive.Interfaces.dll</HintPath>
</Reference>
+ <Reference Include="System.Runtime.Serialization" />
<Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
@@ -52,6 +62,10 @@ under the License.
<Link>Properties\SharedAssemblyInfo.cs</Link>
</Compile>
<Compile Include="AbstractEStage.cs" />
+ <Compile Include="Avro\MessageInstance.cs" />
+ <Compile Include="Avro\Message\Header.cs" />
+ <Compile Include="Avro\ProtocolSerializer.cs" />
+ <Compile Include="Avro\SeializationException.cs" />
<Compile Include="Examples\P2p\IEventSource.cs" />
<Compile Include="Examples\P2p\Pull2Push.cs" />
<Compile Include="IEStage.cs" />
@@ -205,5 +219,6 @@ under the License.
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<Import Project="$(SolutionDir)\.nuget\NuGet.targets"
Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" />
+ <Import Project="$(SolutionDir)\AvroCodeGeneration.Targets"
Condition="Exists('$(SolutionDir)\AvroCodeGeneration.Targets')" />
<Import
Project="$(PackagesDir)\StyleCop.MSBuild.$(StyleCopVersion)\build\StyleCop.MSBuild.Targets"
Condition="Exists('$(PackagesDir)\StyleCop.MSBuild.$(StyleCopVersion)\build\StyleCop.MSBuild.Targets')"
/>
</Project>
http://git-wip-us.apache.org/repos/asf/reef/blob/643fe9b2/lang/cs/Org.Apache.REEF.Wake/packages.config
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/packages.config
b/lang/cs/Org.Apache.REEF.Wake/packages.config
index e176d57..beb1da1 100644
--- a/lang/cs/Org.Apache.REEF.Wake/packages.config
+++ b/lang/cs/Org.Apache.REEF.Wake/packages.config
@@ -18,6 +18,10 @@ specific language governing permissions and limitations
under the License.
-->
<packages>
+ <package id="Microsoft.Avro.Core" version="0.1.0" targetFramework="net451"
developmentDependency="true" />
+ <package id="Microsoft.Avro.Tools" version="0.1.0" targetFramework="net451"
developmentDependency="true" />
+ <package id="Newtonsoft.Json" version="10.0.3" targetFramework="net451"
developmentDependency="true" />
+ <package id="Microsoft.Hadoop.Avro" version="1.5.6" targetFramework="net45"
/>
<package id="protobuf-net" version="2.0.0.668" targetFramework="net45" />
<package id="StyleCop.MSBuild" version="4.7.49.1" targetFramework="net45"
developmentDependency="true" />
<package id="System.Reactive.Core" version="3.1.1" targetFramework="net451"
/>
http://git-wip-us.apache.org/repos/asf/reef/blob/643fe9b2/lang/cs/nuget.config
----------------------------------------------------------------------
diff --git a/lang/cs/nuget.config b/lang/cs/nuget.config
index ddd79f7..79317f3 100644
--- a/lang/cs/nuget.config
+++ b/lang/cs/nuget.config
@@ -19,6 +19,7 @@ under the License.
-->
<configuration>
<packageSources>
+ <add key="nuget.org" value="https://www.nuget.org/api/V2" />
<add key="dotnet"
value="https://dotnet.myget.org/F/dotnet-core/api/v3/index.json" />
</packageSources>
</configuration>