Repository: reef
Updated Branches:
  refs/heads/master 1e62c9dcc -> ffa976a70


[REEF-796] Do Avro (de)serialization for FailedTask in bridge

This addressed the issue by
  * Add AvroFailedTask.
  * Serialize and deserialize info from C# properly without using ad-hoc 
serialization.

JIRA:
  [REEF-796](https://issues.apache.org/jira/browse/REEF-796)

Pull request:
  This closes #960


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/ffa976a7
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/ffa976a7
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/ffa976a7

Branch: refs/heads/master
Commit: ffa976a70643641608405e2e221d5cf7b0d02388
Parents: 1e62c9d
Author: Andrew Chung <[email protected]>
Authored: Tue Apr 19 17:20:39 2016 -0700
Committer: Markus Weimer <[email protected]>
Committed: Mon Apr 25 13:20:22 2016 -0700

----------------------------------------------------------------------
 lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h   |  2 +-
 .../FailedTaskClr2Java.cpp                      | 24 +++---
 lang/cs/Org.Apache.REEF.Client/Avro/README.md   |  2 +-
 .../Avro/AvroFailedTask.cs                      | 90 ++++++++++++++++++++
 lang/cs/Org.Apache.REEF.Common/Avro/README.md   | 19 +++++
 .../Org.Apache.REEF.Common.csproj               |  2 +
 .../Runtime/Evaluator/Task/TaskRuntime.cs       | 40 ++++++---
 .../Runtime/Evaluator/Task/TaskStatus.cs        | 15 +++-
 .../Bridge/Avro/README.md                       | 19 +++++
 .../Bridge/Clr2java/IFailedTaskClr2Java.cs      |  2 +-
 .../Bridge/Events/FailedTask.cs                 | 72 +++-------------
 .../Org.Apache.REEF.Driver.csproj               |  1 +
 .../Bridge/TestFailedTaskEventHandler.cs        | 16 +++-
 .../reef-bridge-java/src/main/avro/Bridge.avsc  | 51 +++++++++++
 .../reef/javabridge/FailedTaskBridge.java       | 77 ++++++++++++-----
 15 files changed, 324 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/ffa976a7/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h 
b/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h
index d7eb713..d00ae59 100644
--- a/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h
+++ b/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h
@@ -128,7 +128,7 @@ namespace Org {
                             !FailedTaskClr2Java();
                             virtual void OnError(String^ message);
                             virtual IActiveContextClr2Java^ GetActiveContext();
-                            virtual String^ GetString();
+                            virtual array<byte>^ GetFailedTaskSerializedAvro();
                         };
 
                         public ref class RunningTaskClr2Java : public 
IRunningTaskClr2Java {

http://git-wip-us.apache.org/repos/asf/reef/blob/ffa976a7/lang/cs/Org.Apache.REEF.Bridge/FailedTaskClr2Java.cpp
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Bridge/FailedTaskClr2Java.cpp 
b/lang/cs/Org.Apache.REEF.Bridge/FailedTaskClr2Java.cpp
index 887c7a9..91443c2 100644
--- a/lang/cs/Org.Apache.REEF.Bridge/FailedTaskClr2Java.cpp
+++ b/lang/cs/Org.Apache.REEF.Bridge/FailedTaskClr2Java.cpp
@@ -60,29 +60,29 @@ namespace Org {
 
                                                  
ManagedLog::LOGGER->LogStop("FailedTaskClr2Java::GetActiveContext");
 
-                          if (jobjectActiveContext == NULL) {
-                              return nullptr;
-                          }
+                                                 if (jobjectActiveContext == 
NULL) {
+                                                         return nullptr;
+                                                 }
 
                                                  return gcnew 
ActiveContextClr2Java(env, jobjectActiveContext);
                                          }
 
-                                         String^ 
FailedTaskClr2Java::GetString() {
-                                                 
ManagedLog::LOGGER->LogStart("FailedTaskClr2Java::GetString");
+                                         array<byte>^ 
FailedTaskClr2Java::GetFailedTaskSerializedAvro() {
+                                                 
ManagedLog::LOGGER->LogStart("FailedTaskClr2Java::GetFailedTaskSerializedAvro");
                                                  JNIEnv *env = 
RetrieveEnv(_jvm);
 
                                                  jclass jclassFailedTask = 
env->GetObjectClass(_jobjectFailedTask);
-                                                 jmethodID 
jmidGetFailedTaskString = env->GetMethodID(jclassFailedTask, 
"getFailedTaskString", "()Ljava/lang/String;");
+                                                 jmethodID 
jmidGetFailedTaskSerializedAvro = env->GetMethodID(jclassFailedTask, 
"getFailedTaskSerializedAvro", "()[B");
 
-                                                 if (jmidGetFailedTaskString 
== NULL) {
-                                                         
ManagedLog::LOGGER->LogStart("jmidGetFailedTaskString is NULL");
+                                                 if 
(jmidGetFailedTaskSerializedAvro == NULL) {
+                                                         
ManagedLog::LOGGER->LogStart("jmidGetFailedTaskSerializedAvro is NULL");
                                                          return nullptr;
                                                  }
-                                                 jstring jFailedTaskString = 
(jstring)env->CallObjectMethod(
+                                                 jbyteArray 
jFailedTaskSerializedAvro = (jbyteArray)env->CallObjectMethod(
                                                          _jobjectFailedTask,
-                                                         
jmidGetFailedTaskString);
-                                                 
ManagedLog::LOGGER->LogStop("FailedTaskClr2Java::GetString");
-                                                 return 
ManagedStringFromJavaString(env, jFailedTaskString);
+                                                         
jmidGetFailedTaskSerializedAvro);
+                                                 
ManagedLog::LOGGER->LogStop("FailedTaskClr2Java::GetFailedTaskSerializedAvro");
+                                                 return 
ManagedByteArrayFromJavaByteArray(env, jFailedTaskSerializedAvro);
                                          }
 
                                          void 
FailedTaskClr2Java::OnError(String^ message) {

http://git-wip-us.apache.org/repos/asf/reef/blob/ffa976a7/lang/cs/Org.Apache.REEF.Client/Avro/README.md
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Avro/README.md 
b/lang/cs/Org.Apache.REEF.Client/Avro/README.md
index 185b3de..be8e56e 100644
--- a/lang/cs/Org.Apache.REEF.Client/Avro/README.md
+++ b/lang/cs/Org.Apache.REEF.Client/Avro/README.md
@@ -7,7 +7,7 @@ unless there is absolutely a reason to!
 
 Instructions On C# Code-Generation
 ----------------------------------
-To code-generate thes files, please use instructions on how 
+To code-generate these files, please use instructions on how 
 to use Microsoft.Hadoop.Avro.Tools.exe as provided 
 
[here](https://azure.microsoft.com/en-us/documentation/articles/hdinsight-dotnet-avro-serialization/)
 on the files in lang/java/reef-bridge-client/src/avro.

http://git-wip-us.apache.org/repos/asf/reef/blob/ffa976a7/lang/cs/Org.Apache.REEF.Common/Avro/AvroFailedTask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Avro/AvroFailedTask.cs 
b/lang/cs/Org.Apache.REEF.Common/Avro/AvroFailedTask.cs
new file mode 100644
index 0000000..43e2653
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Avro/AvroFailedTask.cs
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Runtime.Serialization;
+using Org.Apache.REEF.Utilities.Attributes;
+
+namespace Org.Apache.REEF.Common.Avro
+{
+    /// <summary>
+    /// Used to serialize and deserialize Avro record 
org.apache.reef.javabridge.avro.AvroFailedTask.
+    /// This is a (mostly) auto-generated class. For instructions on how to 
regenerate, please view the README.md in the same folder.
+    /// </summary>
+    [Private]
+    [DataContract(Namespace = "org.apache.reef.javabridge.avro")]
+    public sealed class AvroFailedTask
+    {
+        private const string JsonSchema = 
@"{""type"":""record"",""name"":""org.apache.reef.javabridge.avro.AvroFailedTask"",""doc"":""Defines
 the schema for the failed 
task."",""fields"":[{""name"":""identifier"",""type"":""string""},{""name"":""data"",""type"":""bytes""},{""name"":""cause"",""type"":""bytes""},{""name"":""message"",""type"":""string""}]}";
+
+        /// <summary>
+        /// Gets the schema.
+        /// </summary>
+        public static string Schema
+        {
+            get
+            {
+                return JsonSchema;
+            }
+        }
+
+        /// <summary>
+        /// Gets or sets the identifier field.
+        /// </summary>
+        [DataMember]
+        public string identifier { get; set; }
+
+        /// <summary>
+        /// Gets or sets the data field.
+        /// </summary>
+        [DataMember]
+        public byte[] data { get; set; }
+
+        /// <summary>
+        /// Gets or sets the cause field.
+        /// </summary>
+        [DataMember]
+        public byte[] cause { get; set; }
+
+        /// <summary>
+        /// Gets or sets the message field.
+        /// </summary>
+        [DataMember]
+        public string message { get; set; }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="AvroFailedTask"/> 
class.
+        /// </summary>
+        public AvroFailedTask()
+        {
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="AvroFailedTask"/> 
class.
+        /// </summary>
+        /// <param name="identifier">The identifier.</param>
+        /// <param name="data">The data.</param>
+        /// <param name="cause">The cause.</param>
+        /// <param name="message">The message.</param>
+        public AvroFailedTask(string identifier, byte[] data, byte[] cause, 
string message)
+        {
+            this.identifier = identifier;
+            this.data = data;
+            this.cause = cause;
+            this.message = message;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/ffa976a7/lang/cs/Org.Apache.REEF.Common/Avro/README.md
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Avro/README.md 
b/lang/cs/Org.Apache.REEF.Common/Avro/README.md
new file mode 100644
index 0000000..be8e56e
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Avro/README.md
@@ -0,0 +1,19 @@
+C# Avro Code-Generated Files
+============================
+The files here are code-generated with modifications. The code
+generation is based on the Avro schemas defined in the folder
+lang/java/reef-bridge-client/src/avro. Please do not modify them 
+unless there is absolutely a reason to!
+
+Instructions On C# Code-Generation
+----------------------------------
+To code-generate these files, please use instructions on how 
+to use Microsoft.Hadoop.Avro.Tools.exe as provided 
+[here](https://azure.microsoft.com/en-us/documentation/articles/hdinsight-dotnet-avro-serialization/)
+on the files in lang/java/reef-bridge-client/src/avro.
+Note that as of the time of writing (11/10/2015), the 
+Microsoft Azure HDInsight Avro Library NuGet does not include 
+Microsoft.Hadoop.Avro.Tools.exe. To build it directly from source,
+please download the source code 
[here](http://hadoopsdk.codeplex.com/SourceControl/latest) 
+and run msbuild. More information on how to build is available 
+on the official documentation provided above.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/ffa976a7/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj 
b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
index 66ea5d9..6cf177c 100644
--- a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
+++ b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
@@ -57,6 +57,7 @@ under the License.
     <Compile Include="Avro\AvroDriverInfo.cs">
       <ExcludeFromStyleCop>true</ExcludeFromStyleCop>
     </Compile>
+    <Compile Include="Avro\AvroFailedTask.cs" />
     <Compile Include="Avro\AvroHttpRequest.cs">
       <ExcludeFromStyleCop>true</ExcludeFromStyleCop>
     </Compile>
@@ -191,6 +192,7 @@ under the License.
     <Compile Include="Tasks\TaskMessage.cs" />
   </ItemGroup>
   <ItemGroup>
+    <None Include="Avro\README.md" />
     <None Include="Org.Apache.REEF.Common.nuspec" />
     <None Include="packages.config" />
     <None Include="Protobuf\Proto\client_runtime.proto" />

http://git-wip-us.apache.org/repos/asf/reef/blob/ffa976a7/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs 
b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
index 573c0de..010c0f9 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
@@ -17,7 +17,7 @@
 
 using System;
 using System.Globalization;
-using System.Text;
+using System.Linq;
 using System.Threading;
 using Org.Apache.REEF.Common.Protobuf.ReefProtocol;
 using Org.Apache.REEF.Common.Tasks;
@@ -92,28 +92,27 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
 
             // Send heartbeat such that user receives a TaskRunning message.
             _currentStatus.SetRunning();
-            
+
             return System.Threading.Tasks.Task.Run(() =>
             {
                 Logger.Log(Level.Info, "Calling into user's task.");
                 return _userTask.Call(null);
-            }).ContinueWith(runTask =>
+            }).ContinueWith((System.Threading.Tasks.Task<byte[]> runTask) =>
                 {
                     try
                     {
                         // Task failed.
                         if (runTask.IsFaulted)
                         {
-                            Logger.Log(Level.Warning,
-                                string.Format(CultureInfo.InvariantCulture, 
"Task failed caused by exception [{0}]", runTask.Exception));
-                            _currentStatus.SetException(runTask.Exception);
+                            OnTaskFailure(runTask);
                             return;
                         }
 
                         if (runTask.IsCanceled)
                         {
-                            Logger.Log(Level.Warning,
-                                string.Format(CultureInfo.InvariantCulture, 
"Task failed caused by task cancellation"));
+                            Logger.Log(Level.Error,
+                                string.Format(CultureInfo.InvariantCulture, 
"Task failed caused by System.Threading.Task cancellation"));
+                            OnTaskFailure(runTask);
                             return;
                         }
 
@@ -121,9 +120,12 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
                         var result = runTask.Result;
                         Logger.Log(Level.Info, "Task Call Finished");
                         _currentStatus.SetResult(result);
-                        if (result != null && result.Length > 0)
+
+                        const Level resultLogLevel = Level.Verbose;
+
+                        if (Logger.CustomLevel >= resultLogLevel && result != 
null && result.Length > 0)
                         {
-                            Logger.Log(Level.Info, "Task running result:\r\n" 
+ System.Text.Encoding.Default.GetString(result));
+                            Logger.Log(resultLogLevel, "Task running 
result:\r\n" + System.Text.Encoding.Default.GetString(result));
                         }
                     }
                     finally
@@ -138,6 +140,24 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
                 });
         }
 
+        /// <summary>
+        /// Sets the current status of the Task with the Exception it failed 
with.
+        /// </summary>
+        private void OnTaskFailure(System.Threading.Tasks.Task runTask)
+        {
+            if (runTask.Exception == null)
+            {
+                _currentStatus.SetException(new SystemException("Task failed 
without an Exception."));
+            }
+            else
+            {
+                var aggregateException = runTask.Exception.Flatten();
+                _currentStatus.SetException(
+                    aggregateException.InnerExceptions.Count == 1 ?
+                    aggregateException.InnerExceptions.First() : 
aggregateException);
+            }
+        }
+
         public TaskState GetTaskState()
         {
             return _currentStatus.State;

http://git-wip-us.apache.org/repos/asf/reef/blob/ffa976a7/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs 
b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs
index 0a4491b..909fc78 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs
@@ -18,11 +18,11 @@
 using System;
 using System.Collections.Generic;
 using System.Globalization;
+using Org.Apache.REEF.Common.Avro;
 using Org.Apache.REEF.Common.Context;
 using Org.Apache.REEF.Common.Protobuf.ReefProtocol;
 using Org.Apache.REEF.Common.Tasks;
 using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Tang.Implementations.Tang;
 using Org.Apache.REEF.Utilities;
 using Org.Apache.REEF.Utilities.Logging;
 
@@ -246,8 +246,17 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
                 }
                 else if (_lastException.IsPresent())
                 {
-                    byte[] error = 
ByteUtilities.StringToByteArrays(_lastException.Value.ToString());
-                    taskStatusProto.result = 
ByteUtilities.CopyBytesFrom(error);
+                    var avroFailedTask = new AvroFailedTask
+                    {
+                        identifier = _taskId,
+                        
+                        // TODO[JIRA REEF-1258]: Serialize Exception properly.
+                        cause = new byte[0],
+                        data = 
ByteUtilities.StringToByteArrays(_lastException.Value.ToString()),
+                        message = _lastException.Value.Message
+                    };
+
+                    taskStatusProto.result = 
AvroJsonSerializer<AvroFailedTask>.ToBytes(avroFailedTask);
                 }
                 else if (_state == TaskState.Running)
                 {

http://git-wip-us.apache.org/repos/asf/reef/blob/ffa976a7/lang/cs/Org.Apache.REEF.Driver/Bridge/Avro/README.md
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Avro/README.md 
b/lang/cs/Org.Apache.REEF.Driver/Bridge/Avro/README.md
new file mode 100644
index 0000000..be8e56e
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Avro/README.md
@@ -0,0 +1,19 @@
+C# Avro Code-Generated Files
+============================
+The files here are code-generated with modifications. The code
+generation is based on the Avro schemas defined in the folder
+lang/java/reef-bridge-client/src/avro. Please do not modify them 
+unless there is absolutely a reason to!
+
+Instructions On C# Code-Generation
+----------------------------------
+To code-generate these files, please use instructions on how 
+to use Microsoft.Hadoop.Avro.Tools.exe as provided 
+[here](https://azure.microsoft.com/en-us/documentation/articles/hdinsight-dotnet-avro-serialization/)
+on the files in lang/java/reef-bridge-client/src/avro.
+Note that as of the time of writing (11/10/2015), the 
+Microsoft Azure HDInsight Avro Library NuGet does not include 
+Microsoft.Hadoop.Avro.Tools.exe. To build it directly from source,
+please download the source code 
[here](http://hadoopsdk.codeplex.com/SourceControl/latest) 
+and run msbuild. More information on how to build is available 
+on the official documentation provided above.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/ffa976a7/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IFailedTaskClr2Java.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IFailedTaskClr2Java.cs 
b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IFailedTaskClr2Java.cs
index 7e9bc52..194e76e 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IFailedTaskClr2Java.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IFailedTaskClr2Java.cs
@@ -24,6 +24,6 @@ namespace Org.Apache.REEF.Driver.Bridge.Clr2java
     {
         IActiveContextClr2Java GetActiveContext();
 
-        string GetString();
+        byte[] GetFailedTaskSerializedAvro();
     }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/ffa976a7/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/FailedTask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/FailedTask.cs 
b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/FailedTask.cs
index fa108b1..56b6a17 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/FailedTask.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/FailedTask.cs
@@ -16,13 +16,12 @@
 // under the License.
 
 using System;
-using System.Collections.Generic;
 using System.Runtime.Serialization;
+using Org.Apache.REEF.Common.Avro;
 using Org.Apache.REEF.Driver.Bridge.Clr2java;
 using Org.Apache.REEF.Driver.Context;
 using Org.Apache.REEF.Driver.Task;
 using Org.Apache.REEF.Utilities;
-using Org.Apache.REEF.Utilities.Diagnostics;
 using Org.Apache.REEF.Utilities.Logging;
 
 namespace Org.Apache.REEF.Driver.Bridge.Events
@@ -33,7 +32,18 @@ namespace Org.Apache.REEF.Driver.Bridge.Events
         
         public FailedTask(IFailedTaskClr2Java failedTaskClr2Java)
         {
-            Parse(failedTaskClr2Java);
+            var serializedInfo = 
failedTaskClr2Java.GetFailedTaskSerializedAvro();
+            var avroFailedTask = 
AvroJsonSerializer<AvroFailedTask>.FromBytes(serializedInfo);
+
+            Id = avroFailedTask.identifier;
+            Data = Optional<byte[]>.OfNullable(avroFailedTask.data);
+            Message = avroFailedTask.message ?? "No message in Failed Task.";
+
+            // TODO[JIRA REEF-1258]: Fill this in with avroFailedTask.cause.
+            Cause = Optional<Exception>.Empty();
+
+            // This is always empty, even in Java.
+            Description = Optional<string>.Empty();
             FailedTaskClr2Java = failedTaskClr2Java;
             ActiveContextClr2Java = failedTaskClr2Java.GetActiveContext();
         }
@@ -79,61 +89,5 @@ namespace Org.Apache.REEF.Driver.Bridge.Events
         {
             throw new NotImplementedException();
         }
-
-        private void Parse(IFailedTaskClr2Java failedTaskClr2Java)
-        {
-            string serializedInfo = failedTaskClr2Java.GetString();
-            LOGGER.Log(Level.Verbose, "serialized failed task: " + 
serializedInfo);
-            Dictionary<string, string> settings = new Dictionary<string, 
string>();
-            string[] components = serializedInfo.Split(',');
-            foreach (string component in components)
-            {
-                string[] pair = component.Trim().Split('=');
-                if (pair == null || pair.Length != 2)
-                {
-                    Exceptions.Throw(new ArgumentException("invalid component 
to be used as key-value pair:", component), LOGGER);
-                }
-                settings.Add(pair[0], pair[1]);
-            }
-
-            string id;
-            if (!settings.TryGetValue("Identifier", out id))
-            {
-                Exceptions.Throw(new ArgumentException("cannot find Identifier 
entry."), LOGGER);
-            }
-            Id = id;
-
-            string msg;
-            if (!settings.TryGetValue("Message", out msg))
-            {
-                LOGGER.Log(Level.Verbose, "no Message in Failed Task.");
-                msg = string.Empty;
-            }
-            Message = msg;
-
-            string description;
-            if (!settings.TryGetValue("Description", out description))
-            {
-                LOGGER.Log(Level.Verbose, "no Description in Failed Task.");
-                description = string.Empty;
-            }
-            Description = string.IsNullOrWhiteSpace(description) ? 
Optional<string>.Empty() : Optional<string>.Of(description);
-
-            string cause;
-            if (!settings.TryGetValue("Cause", out cause))
-            {
-                LOGGER.Log(Level.Verbose, "no Cause in Failed Task.");
-                cause = string.Empty;
-            }
-            Reason = string.IsNullOrWhiteSpace(cause) ? 
Optional<string>.Empty() : Optional<string>.Of(cause);
-
-            string rawData;
-            if (!settings.TryGetValue("Data", out rawData))
-            {
-                LOGGER.Log(Level.Verbose, "no Data in Failed Task.");
-                rawData = string.Empty;
-            }
-            Data = string.IsNullOrWhiteSpace(rawData) ? 
Optional<byte[]>.Empty() : 
Optional<byte[]>.Of(ByteUtilities.StringToByteArrays(rawData));
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/ffa976a7/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj 
b/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj
index 3fe088c..cb40a47 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj
+++ b/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj
@@ -151,6 +151,7 @@ under the License.
     <Compile Include="Task\ITaskMessage.cs" />
   </ItemGroup>
   <ItemGroup>
+    <None Include="Bridge\Avro\README.md" />
     <None Include="Org.Apache.REEF.Driver.nuspec" />
     <None Include="packages.config" />
   </ItemGroup>

http://git-wip-us.apache.org/repos/asf/reef/blob/ffa976a7/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestFailedTaskEventHandler.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestFailedTaskEventHandler.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestFailedTaskEventHandler.cs
index dbe512c..e49b148 100644
--- 
a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestFailedTaskEventHandler.cs
+++ 
b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestFailedTaskEventHandler.cs
@@ -34,6 +34,7 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
     public sealed class TestFailedTaskEventHandler : ReefFunctionalTest
     {
         private const string FailedTaskMessage = "I have successfully seen a 
failed task.";
+        private const string ExpectedExceptionMessage = "Expected exception.";
 
         [Fact]
         [Trait("Priority", "1")]
@@ -65,6 +66,7 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
         private sealed class FailedTaskDriver : IObserver<IDriverStarted>, 
IObserver<IAllocatedEvaluator>, 
             IObserver<IFailedTask>, IObserver<ICompletedTask>
         {
+            private const string TaskId = "1234567";
             private static readonly Logger Logger = 
Logger.GetLogger(typeof(FailedTaskDriver));
 
             private readonly IEvaluatorRequestor _requestor;
@@ -83,7 +85,7 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
             public void OnNext(IAllocatedEvaluator value)
             {
                 value.SubmitTask(TaskConfiguration.ConfigurationModule
-                    .Set(TaskConfiguration.Identifier, "1234567")
+                    .Set(TaskConfiguration.Identifier, TaskId)
                     .Set(TaskConfiguration.Task, GenericType<FailTask>.Class)
                     .Build());
             }
@@ -91,6 +93,16 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
             public void OnNext(IFailedTask value)
             {
                 Logger.Log(Level.Error, FailedTaskMessage);
+                if (value.Message == null || value.Message != 
ExpectedExceptionMessage)
+                {
+                    throw new Exception("Exception message not properly 
propagated. Received message " + value.Message);
+                }
+
+                if (value.Id != TaskId)
+                {
+                    throw new Exception("Received Task ID " + value.Id + " 
instead of the expected Task ID " + TaskId);
+                }
+
                 value.GetActiveContext().Value.Dispose();
             }
 
@@ -123,7 +135,7 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
 
             public byte[] Call(byte[] memento)
             {
-                throw new Exception("Expected exception.");
+                throw new Exception(ExpectedExceptionMessage);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/reef/blob/ffa976a7/lang/java/reef-bridge-java/src/main/avro/Bridge.avsc
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/avro/Bridge.avsc 
b/lang/java/reef-bridge-java/src/main/avro/Bridge.avsc
new file mode 100644
index 0000000..1b75efd
--- /dev/null
+++ b/lang/java/reef-bridge-java/src/main/avro/Bridge.avsc
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ [
+/*
+ * Defines the schema for Bridge classes such that information can be passed 
from C# Evaluator to C# Driver.
+ */
+ {
+    "namespace":"org.apache.reef.javabridge.avro",
+    "type":"record",
+    "name":"AvroFailedTask",
+    "doc":"Defines the schema for failed task. Tunnels Task failures from C# 
Evaluator to Java Driver to C# Driver.",
+    "fields":[
+      {
+        "name":"identifier",
+        "doc":"The Task ID of the failed Task.",
+        "type":"string"
+      },
+      {
+        "name":"data",
+        "doc":"The data passed back from the Failed Task, if any.",
+        "type":"bytes"
+      },
+      {
+        "name":"cause",
+        "doc":"The serialized Exception of that caused the Task failure.",
+        "type":"bytes"
+      },
+      {
+        "name":"message",
+        "doc":"The message of the Task failure, if any.",
+        "type":"string"
+      }
+    ]
+  }
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/ffa976a7/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedTaskBridge.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedTaskBridge.java
 
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedTaskBridge.java
index d80d6df..0f2221a 100644
--- 
a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedTaskBridge.java
+++ 
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedTaskBridge.java
@@ -18,12 +18,19 @@
  */
 package org.apache.reef.javabridge;
 
+import org.apache.avro.io.*;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.reef.annotations.audience.Interop;
 import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.driver.task.FailedTask;
+import org.apache.reef.javabridge.avro.AvroFailedTask;
 
-import java.nio.charset.StandardCharsets;
-import java.util.logging.Level;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.logging.Logger;
 
 /**
@@ -36,8 +43,9 @@ import java.util.logging.Logger;
 public final class FailedTaskBridge extends NativeBridge {
   private static final Logger LOG = 
Logger.getLogger(FailedTaskBridge.class.getName());
 
-  private FailedTask jfailedTask;
-  private ActiveContextBridge jactiveContext;
+  private final FailedTask jfailedTask;
+  private final ActiveContextBridge jactiveContext;
+  private final byte[] failedTaskSerializedAvro;
 
   public FailedTaskBridge(final FailedTask failedTask, final 
ActiveContextBridgeFactory factory) {
     this.jfailedTask = failedTask;
@@ -46,29 +54,60 @@ public final class FailedTaskBridge extends NativeBridge {
     } else {
       this.jactiveContext = null;
     }
+
+    try {
+      this.failedTaskSerializedAvro = generateFailedTaskSerializedAvro();
+    } catch(final Exception e) {
+      throw new RuntimeException(e);
+    }
   }
 
   public ActiveContextBridge getActiveContext() {
     return jactiveContext;
   }
 
-  public String getFailedTaskString() {
-    final String description = jfailedTask.getDescription().isPresent() ?
-        jfailedTask.getDescription().get().replace("=", "").replace(",", "") : 
"";
-    final String cause = jfailedTask.getReason().isPresent() ?
-        jfailedTask.getReason().get().toString().replace("=", "").replace(",", 
"") : "";
-    final String data = jfailedTask.getData().isPresent() ?
-        new String(jfailedTask.getData().get(), 
StandardCharsets.UTF_8).replace("=", "").replace(",", "") : "";
+  public byte[] getFailedTaskSerializedAvro() {
+    return failedTaskSerializedAvro;
+  }
+
+  private byte[] generateFailedTaskSerializedAvro() throws IOException {
+    AvroFailedTask avroFailedTask = null;
+
+    if (jfailedTask.getData() != null && jfailedTask.getData().isPresent()) {
+      // Deserialize what was passed in from C#.
+      try (final ByteArrayInputStream fileInputStream = new 
ByteArrayInputStream(jfailedTask.getData().get())) {
+        final JsonDecoder decoder = DecoderFactory.get().jsonDecoder(
+            AvroFailedTask.getClassSchema(), fileInputStream);
+        final SpecificDatumReader<AvroFailedTask> reader =
+            new SpecificDatumReader<>(AvroFailedTask.class);
+        avroFailedTask = reader.read(null, decoder);
+      }
+    } else {
+      // This may result from a failed Evaluator.
+      avroFailedTask = AvroFailedTask.newBuilder()
+          .setIdentifier(jfailedTask.getId())
+          .setCause(ByteBuffer.wrap(new byte[0]))
+          .setData(ByteBuffer.wrap(new byte[0]))
+          .setMessage("")
+          .build();
+    }
 
-    // TODO[JIRA REEF-796]: deserialize/serialize with proper Avro schema
-    final String poorSerializedString = "Identifier=" + 
jfailedTask.getId().replace("=", "").replace(",", "")
-        + ", Message=" + jfailedTask.getMessage().replace("=", 
"").replace(",", "")
-        + ", Description=" + description
-        + ", Cause=" + cause
-        + ", Data=" + data;
+    // Overwrite the message if Java provides a message and C# does not.
+    // Typically the case for failed Evaluators.
+    if (StringUtils.isNoneBlank(jfailedTask.getMessage()) &&
+        StringUtils.isBlank(avroFailedTask.getMessage().toString())) {
+      avroFailedTask.setMessage(jfailedTask.getMessage());
+    }
 
-    LOG.log(Level.INFO, "serialized failed task " + poorSerializedString);
-    return poorSerializedString;
+    final DatumWriter<AvroFailedTask> datumWriter = new 
SpecificDatumWriter<>(AvroFailedTask.class);
+
+    try (final ByteArrayOutputStream outputStream = new 
ByteArrayOutputStream()) {
+      final JsonEncoder encoder = 
EncoderFactory.get().jsonEncoder(avroFailedTask.getSchema(), outputStream);
+      datumWriter.write(avroFailedTask, encoder);
+      encoder.flush();
+      outputStream.flush();
+      return outputStream.toByteArray();
+    }
   }
 
   @Override

Reply via email to