Repository: reef Updated Branches: refs/heads/master 1e62c9dcc -> ffa976a70
[REEF-796] Do Avro (de)serialization for FailedTask in bridge This addressed the issue by * Add AvroFailedTask. * Serialize and deserialize info from C# properly without using ad-hoc serialization. JIRA: [REEF-796](https://issues.apache.org/jira/browse/REEF-796) Pull request: This closes #960 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/ffa976a7 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/ffa976a7 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/ffa976a7 Branch: refs/heads/master Commit: ffa976a70643641608405e2e221d5cf7b0d02388 Parents: 1e62c9d Author: Andrew Chung <[email protected]> Authored: Tue Apr 19 17:20:39 2016 -0700 Committer: Markus Weimer <[email protected]> Committed: Mon Apr 25 13:20:22 2016 -0700 ---------------------------------------------------------------------- lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h | 2 +- .../FailedTaskClr2Java.cpp | 24 +++--- lang/cs/Org.Apache.REEF.Client/Avro/README.md | 2 +- .../Avro/AvroFailedTask.cs | 90 ++++++++++++++++++++ lang/cs/Org.Apache.REEF.Common/Avro/README.md | 19 +++++ .../Org.Apache.REEF.Common.csproj | 2 + .../Runtime/Evaluator/Task/TaskRuntime.cs | 40 ++++++--- .../Runtime/Evaluator/Task/TaskStatus.cs | 15 +++- .../Bridge/Avro/README.md | 19 +++++ .../Bridge/Clr2java/IFailedTaskClr2Java.cs | 2 +- .../Bridge/Events/FailedTask.cs | 72 +++------------- .../Org.Apache.REEF.Driver.csproj | 1 + .../Bridge/TestFailedTaskEventHandler.cs | 16 +++- .../reef-bridge-java/src/main/avro/Bridge.avsc | 51 +++++++++++ .../reef/javabridge/FailedTaskBridge.java | 77 ++++++++++++----- 15 files changed, 324 insertions(+), 108 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/ffa976a7/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h b/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h index d7eb713..d00ae59 100644 --- a/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h +++ b/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h @@ -128,7 +128,7 @@ namespace Org { !FailedTaskClr2Java(); virtual void OnError(String^ message); virtual IActiveContextClr2Java^ GetActiveContext(); - virtual String^ GetString(); + virtual array<byte>^ GetFailedTaskSerializedAvro(); }; public ref class RunningTaskClr2Java : public IRunningTaskClr2Java { http://git-wip-us.apache.org/repos/asf/reef/blob/ffa976a7/lang/cs/Org.Apache.REEF.Bridge/FailedTaskClr2Java.cpp ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Bridge/FailedTaskClr2Java.cpp b/lang/cs/Org.Apache.REEF.Bridge/FailedTaskClr2Java.cpp index 887c7a9..91443c2 100644 --- a/lang/cs/Org.Apache.REEF.Bridge/FailedTaskClr2Java.cpp +++ b/lang/cs/Org.Apache.REEF.Bridge/FailedTaskClr2Java.cpp @@ -60,29 +60,29 @@ namespace Org { ManagedLog::LOGGER->LogStop("FailedTaskClr2Java::GetActiveContext"); - if (jobjectActiveContext == NULL) { - return nullptr; - } + if (jobjectActiveContext == NULL) { + return nullptr; + } return gcnew ActiveContextClr2Java(env, jobjectActiveContext); } - String^ FailedTaskClr2Java::GetString() { - ManagedLog::LOGGER->LogStart("FailedTaskClr2Java::GetString"); + array<byte>^ FailedTaskClr2Java::GetFailedTaskSerializedAvro() { + ManagedLog::LOGGER->LogStart("FailedTaskClr2Java::GetFailedTaskSerializedAvro"); JNIEnv *env = RetrieveEnv(_jvm); jclass jclassFailedTask = env->GetObjectClass(_jobjectFailedTask); - jmethodID jmidGetFailedTaskString = env->GetMethodID(jclassFailedTask, "getFailedTaskString", "()Ljava/lang/String;"); + jmethodID jmidGetFailedTaskSerializedAvro = env->GetMethodID(jclassFailedTask, "getFailedTaskSerializedAvro", "()[B"); - if (jmidGetFailedTaskString == NULL) { - ManagedLog::LOGGER->LogStart("jmidGetFailedTaskString is NULL"); + if (jmidGetFailedTaskSerializedAvro == NULL) { + ManagedLog::LOGGER->LogStart("jmidGetFailedTaskSerializedAvro is NULL"); return nullptr; } - jstring jFailedTaskString = (jstring)env->CallObjectMethod( + jbyteArray jFailedTaskSerializedAvro = (jbyteArray)env->CallObjectMethod( _jobjectFailedTask, - jmidGetFailedTaskString); - ManagedLog::LOGGER->LogStop("FailedTaskClr2Java::GetString"); - return ManagedStringFromJavaString(env, jFailedTaskString); + jmidGetFailedTaskSerializedAvro); + ManagedLog::LOGGER->LogStop("FailedTaskClr2Java::GetFailedTaskSerializedAvro"); + return ManagedByteArrayFromJavaByteArray(env, jFailedTaskSerializedAvro); } void FailedTaskClr2Java::OnError(String^ message) { http://git-wip-us.apache.org/repos/asf/reef/blob/ffa976a7/lang/cs/Org.Apache.REEF.Client/Avro/README.md ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/Avro/README.md b/lang/cs/Org.Apache.REEF.Client/Avro/README.md index 185b3de..be8e56e 100644 --- a/lang/cs/Org.Apache.REEF.Client/Avro/README.md +++ b/lang/cs/Org.Apache.REEF.Client/Avro/README.md @@ -7,7 +7,7 @@ unless there is absolutely a reason to! Instructions On C# Code-Generation ---------------------------------- -To code-generate thes files, please use instructions on how +To code-generate these files, please use instructions on how to use Microsoft.Hadoop.Avro.Tools.exe as provided [here](https://azure.microsoft.com/en-us/documentation/articles/hdinsight-dotnet-avro-serialization/) on the files in lang/java/reef-bridge-client/src/avro. http://git-wip-us.apache.org/repos/asf/reef/blob/ffa976a7/lang/cs/Org.Apache.REEF.Common/Avro/AvroFailedTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Avro/AvroFailedTask.cs b/lang/cs/Org.Apache.REEF.Common/Avro/AvroFailedTask.cs new file mode 100644 index 0000000..43e2653 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Avro/AvroFailedTask.cs @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.Runtime.Serialization; +using Org.Apache.REEF.Utilities.Attributes; + +namespace Org.Apache.REEF.Common.Avro +{ + /// <summary> + /// Used to serialize and deserialize Avro record org.apache.reef.javabridge.avro.AvroFailedTask. + /// This is a (mostly) auto-generated class. For instructions on how to regenerate, please view the README.md in the same folder. + /// </summary> + [Private] + [DataContract(Namespace = "org.apache.reef.javabridge.avro")] + public sealed class AvroFailedTask + { + private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.javabridge.avro.AvroFailedTask"",""doc"":""Defines the schema for the failed task."",""fields"":[{""name"":""identifier"",""type"":""string""},{""name"":""data"",""type"":""bytes""},{""name"":""cause"",""type"":""bytes""},{""name"":""message"",""type"":""string""}]}"; + + /// <summary> + /// Gets the schema. + /// </summary> + public static string Schema + { + get + { + return JsonSchema; + } + } + + /// <summary> + /// Gets or sets the identifier field. + /// </summary> + [DataMember] + public string identifier { get; set; } + + /// <summary> + /// Gets or sets the data field. + /// </summary> + [DataMember] + public byte[] data { get; set; } + + /// <summary> + /// Gets or sets the cause field. + /// </summary> + [DataMember] + public byte[] cause { get; set; } + + /// <summary> + /// Gets or sets the message field. + /// </summary> + [DataMember] + public string message { get; set; } + + /// <summary> + /// Initializes a new instance of the <see cref="AvroFailedTask"/> class. + /// </summary> + public AvroFailedTask() + { + } + + /// <summary> + /// Initializes a new instance of the <see cref="AvroFailedTask"/> class. + /// </summary> + /// <param name="identifier">The identifier.</param> + /// <param name="data">The data.</param> + /// <param name="cause">The cause.</param> + /// <param name="message">The message.</param> + public AvroFailedTask(string identifier, byte[] data, byte[] cause, string message) + { + this.identifier = identifier; + this.data = data; + this.cause = cause; + this.message = message; + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/ffa976a7/lang/cs/Org.Apache.REEF.Common/Avro/README.md ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Avro/README.md b/lang/cs/Org.Apache.REEF.Common/Avro/README.md new file mode 100644 index 0000000..be8e56e --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Avro/README.md @@ -0,0 +1,19 @@ +C# Avro Code-Generated Files +============================ +The files here are code-generated with modifications. The code +generation is based on the Avro schemas defined in the folder +lang/java/reef-bridge-client/src/avro. Please do not modify them +unless there is absolutely a reason to! + +Instructions On C# Code-Generation +---------------------------------- +To code-generate these files, please use instructions on how +to use Microsoft.Hadoop.Avro.Tools.exe as provided +[here](https://azure.microsoft.com/en-us/documentation/articles/hdinsight-dotnet-avro-serialization/) +on the files in lang/java/reef-bridge-client/src/avro. +Note that as of the time of writing (11/10/2015), the +Microsoft Azure HDInsight Avro Library NuGet does not include +Microsoft.Hadoop.Avro.Tools.exe. To build it directly from source, +please download the source code [here](http://hadoopsdk.codeplex.com/SourceControl/latest) +and run msbuild. More information on how to build is available +on the official documentation provided above. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/ffa976a7/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj index 66ea5d9..6cf177c 100644 --- a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj +++ b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj @@ -57,6 +57,7 @@ under the License. <Compile Include="Avro\AvroDriverInfo.cs"> <ExcludeFromStyleCop>true</ExcludeFromStyleCop> </Compile> + <Compile Include="Avro\AvroFailedTask.cs" /> <Compile Include="Avro\AvroHttpRequest.cs"> <ExcludeFromStyleCop>true</ExcludeFromStyleCop> </Compile> @@ -191,6 +192,7 @@ under the License. <Compile Include="Tasks\TaskMessage.cs" /> </ItemGroup> <ItemGroup> + <None Include="Avro\README.md" /> <None Include="Org.Apache.REEF.Common.nuspec" /> <None Include="packages.config" /> <None Include="Protobuf\Proto\client_runtime.proto" /> http://git-wip-us.apache.org/repos/asf/reef/blob/ffa976a7/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs index 573c0de..010c0f9 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs @@ -17,7 +17,7 @@ using System; using System.Globalization; -using System.Text; +using System.Linq; using System.Threading; using Org.Apache.REEF.Common.Protobuf.ReefProtocol; using Org.Apache.REEF.Common.Tasks; @@ -92,28 +92,27 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task // Send heartbeat such that user receives a TaskRunning message. _currentStatus.SetRunning(); - + return System.Threading.Tasks.Task.Run(() => { Logger.Log(Level.Info, "Calling into user's task."); return _userTask.Call(null); - }).ContinueWith(runTask => + }).ContinueWith((System.Threading.Tasks.Task<byte[]> runTask) => { try { // Task failed. if (runTask.IsFaulted) { - Logger.Log(Level.Warning, - string.Format(CultureInfo.InvariantCulture, "Task failed caused by exception [{0}]", runTask.Exception)); - _currentStatus.SetException(runTask.Exception); + OnTaskFailure(runTask); return; } if (runTask.IsCanceled) { - Logger.Log(Level.Warning, - string.Format(CultureInfo.InvariantCulture, "Task failed caused by task cancellation")); + Logger.Log(Level.Error, + string.Format(CultureInfo.InvariantCulture, "Task failed caused by System.Threading.Task cancellation")); + OnTaskFailure(runTask); return; } @@ -121,9 +120,12 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task var result = runTask.Result; Logger.Log(Level.Info, "Task Call Finished"); _currentStatus.SetResult(result); - if (result != null && result.Length > 0) + + const Level resultLogLevel = Level.Verbose; + + if (Logger.CustomLevel >= resultLogLevel && result != null && result.Length > 0) { - Logger.Log(Level.Info, "Task running result:\r\n" + System.Text.Encoding.Default.GetString(result)); + Logger.Log(resultLogLevel, "Task running result:\r\n" + System.Text.Encoding.Default.GetString(result)); } } finally @@ -138,6 +140,24 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task }); } + /// <summary> + /// Sets the current status of the Task with the Exception it failed with. + /// </summary> + private void OnTaskFailure(System.Threading.Tasks.Task runTask) + { + if (runTask.Exception == null) + { + _currentStatus.SetException(new SystemException("Task failed without an Exception.")); + } + else + { + var aggregateException = runTask.Exception.Flatten(); + _currentStatus.SetException( + aggregateException.InnerExceptions.Count == 1 ? + aggregateException.InnerExceptions.First() : aggregateException); + } + } + public TaskState GetTaskState() { return _currentStatus.State; http://git-wip-us.apache.org/repos/asf/reef/blob/ffa976a7/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs index 0a4491b..909fc78 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs @@ -18,11 +18,11 @@ using System; using System.Collections.Generic; using System.Globalization; +using Org.Apache.REEF.Common.Avro; using Org.Apache.REEF.Common.Context; using Org.Apache.REEF.Common.Protobuf.ReefProtocol; using Org.Apache.REEF.Common.Tasks; using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Utilities; using Org.Apache.REEF.Utilities.Logging; @@ -246,8 +246,17 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task } else if (_lastException.IsPresent()) { - byte[] error = ByteUtilities.StringToByteArrays(_lastException.Value.ToString()); - taskStatusProto.result = ByteUtilities.CopyBytesFrom(error); + var avroFailedTask = new AvroFailedTask + { + identifier = _taskId, + + // TODO[JIRA REEF-1258]: Serialize Exception properly. + cause = new byte[0], + data = ByteUtilities.StringToByteArrays(_lastException.Value.ToString()), + message = _lastException.Value.Message + }; + + taskStatusProto.result = AvroJsonSerializer<AvroFailedTask>.ToBytes(avroFailedTask); } else if (_state == TaskState.Running) { http://git-wip-us.apache.org/repos/asf/reef/blob/ffa976a7/lang/cs/Org.Apache.REEF.Driver/Bridge/Avro/README.md ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Avro/README.md b/lang/cs/Org.Apache.REEF.Driver/Bridge/Avro/README.md new file mode 100644 index 0000000..be8e56e --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Avro/README.md @@ -0,0 +1,19 @@ +C# Avro Code-Generated Files +============================ +The files here are code-generated with modifications. The code +generation is based on the Avro schemas defined in the folder +lang/java/reef-bridge-client/src/avro. Please do not modify them +unless there is absolutely a reason to! + +Instructions On C# Code-Generation +---------------------------------- +To code-generate these files, please use instructions on how +to use Microsoft.Hadoop.Avro.Tools.exe as provided +[here](https://azure.microsoft.com/en-us/documentation/articles/hdinsight-dotnet-avro-serialization/) +on the files in lang/java/reef-bridge-client/src/avro. +Note that as of the time of writing (11/10/2015), the +Microsoft Azure HDInsight Avro Library NuGet does not include +Microsoft.Hadoop.Avro.Tools.exe. To build it directly from source, +please download the source code [here](http://hadoopsdk.codeplex.com/SourceControl/latest) +and run msbuild. More information on how to build is available +on the official documentation provided above. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/ffa976a7/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IFailedTaskClr2Java.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IFailedTaskClr2Java.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IFailedTaskClr2Java.cs index 7e9bc52..194e76e 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IFailedTaskClr2Java.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IFailedTaskClr2Java.cs @@ -24,6 +24,6 @@ namespace Org.Apache.REEF.Driver.Bridge.Clr2java { IActiveContextClr2Java GetActiveContext(); - string GetString(); + byte[] GetFailedTaskSerializedAvro(); } } http://git-wip-us.apache.org/repos/asf/reef/blob/ffa976a7/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/FailedTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/FailedTask.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/FailedTask.cs index fa108b1..56b6a17 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/FailedTask.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/FailedTask.cs @@ -16,13 +16,12 @@ // under the License. using System; -using System.Collections.Generic; using System.Runtime.Serialization; +using Org.Apache.REEF.Common.Avro; using Org.Apache.REEF.Driver.Bridge.Clr2java; using Org.Apache.REEF.Driver.Context; using Org.Apache.REEF.Driver.Task; using Org.Apache.REEF.Utilities; -using Org.Apache.REEF.Utilities.Diagnostics; using Org.Apache.REEF.Utilities.Logging; namespace Org.Apache.REEF.Driver.Bridge.Events @@ -33,7 +32,18 @@ namespace Org.Apache.REEF.Driver.Bridge.Events public FailedTask(IFailedTaskClr2Java failedTaskClr2Java) { - Parse(failedTaskClr2Java); + var serializedInfo = failedTaskClr2Java.GetFailedTaskSerializedAvro(); + var avroFailedTask = AvroJsonSerializer<AvroFailedTask>.FromBytes(serializedInfo); + + Id = avroFailedTask.identifier; + Data = Optional<byte[]>.OfNullable(avroFailedTask.data); + Message = avroFailedTask.message ?? "No message in Failed Task."; + + // TODO[JIRA REEF-1258]: Fill this in with avroFailedTask.cause. + Cause = Optional<Exception>.Empty(); + + // This is always empty, even in Java. + Description = Optional<string>.Empty(); FailedTaskClr2Java = failedTaskClr2Java; ActiveContextClr2Java = failedTaskClr2Java.GetActiveContext(); } @@ -79,61 +89,5 @@ namespace Org.Apache.REEF.Driver.Bridge.Events { throw new NotImplementedException(); } - - private void Parse(IFailedTaskClr2Java failedTaskClr2Java) - { - string serializedInfo = failedTaskClr2Java.GetString(); - LOGGER.Log(Level.Verbose, "serialized failed task: " + serializedInfo); - Dictionary<string, string> settings = new Dictionary<string, string>(); - string[] components = serializedInfo.Split(','); - foreach (string component in components) - { - string[] pair = component.Trim().Split('='); - if (pair == null || pair.Length != 2) - { - Exceptions.Throw(new ArgumentException("invalid component to be used as key-value pair:", component), LOGGER); - } - settings.Add(pair[0], pair[1]); - } - - string id; - if (!settings.TryGetValue("Identifier", out id)) - { - Exceptions.Throw(new ArgumentException("cannot find Identifier entry."), LOGGER); - } - Id = id; - - string msg; - if (!settings.TryGetValue("Message", out msg)) - { - LOGGER.Log(Level.Verbose, "no Message in Failed Task."); - msg = string.Empty; - } - Message = msg; - - string description; - if (!settings.TryGetValue("Description", out description)) - { - LOGGER.Log(Level.Verbose, "no Description in Failed Task."); - description = string.Empty; - } - Description = string.IsNullOrWhiteSpace(description) ? Optional<string>.Empty() : Optional<string>.Of(description); - - string cause; - if (!settings.TryGetValue("Cause", out cause)) - { - LOGGER.Log(Level.Verbose, "no Cause in Failed Task."); - cause = string.Empty; - } - Reason = string.IsNullOrWhiteSpace(cause) ? Optional<string>.Empty() : Optional<string>.Of(cause); - - string rawData; - if (!settings.TryGetValue("Data", out rawData)) - { - LOGGER.Log(Level.Verbose, "no Data in Failed Task."); - rawData = string.Empty; - } - Data = string.IsNullOrWhiteSpace(rawData) ? Optional<byte[]>.Empty() : Optional<byte[]>.Of(ByteUtilities.StringToByteArrays(rawData)); - } } } http://git-wip-us.apache.org/repos/asf/reef/blob/ffa976a7/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj b/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj index 3fe088c..cb40a47 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj +++ b/lang/cs/Org.Apache.REEF.Driver/Org.Apache.REEF.Driver.csproj @@ -151,6 +151,7 @@ under the License. <Compile Include="Task\ITaskMessage.cs" /> </ItemGroup> <ItemGroup> + <None Include="Bridge\Avro\README.md" /> <None Include="Org.Apache.REEF.Driver.nuspec" /> <None Include="packages.config" /> </ItemGroup> http://git-wip-us.apache.org/repos/asf/reef/blob/ffa976a7/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestFailedTaskEventHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestFailedTaskEventHandler.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestFailedTaskEventHandler.cs index dbe512c..e49b148 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestFailedTaskEventHandler.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestFailedTaskEventHandler.cs @@ -34,6 +34,7 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge public sealed class TestFailedTaskEventHandler : ReefFunctionalTest { private const string FailedTaskMessage = "I have successfully seen a failed task."; + private const string ExpectedExceptionMessage = "Expected exception."; [Fact] [Trait("Priority", "1")] @@ -65,6 +66,7 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge private sealed class FailedTaskDriver : IObserver<IDriverStarted>, IObserver<IAllocatedEvaluator>, IObserver<IFailedTask>, IObserver<ICompletedTask> { + private const string TaskId = "1234567"; private static readonly Logger Logger = Logger.GetLogger(typeof(FailedTaskDriver)); private readonly IEvaluatorRequestor _requestor; @@ -83,7 +85,7 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge public void OnNext(IAllocatedEvaluator value) { value.SubmitTask(TaskConfiguration.ConfigurationModule - .Set(TaskConfiguration.Identifier, "1234567") + .Set(TaskConfiguration.Identifier, TaskId) .Set(TaskConfiguration.Task, GenericType<FailTask>.Class) .Build()); } @@ -91,6 +93,16 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge public void OnNext(IFailedTask value) { Logger.Log(Level.Error, FailedTaskMessage); + if (value.Message == null || value.Message != ExpectedExceptionMessage) + { + throw new Exception("Exception message not properly propagated. Received message " + value.Message); + } + + if (value.Id != TaskId) + { + throw new Exception("Received Task ID " + value.Id + " instead of the expected Task ID " + TaskId); + } + value.GetActiveContext().Value.Dispose(); } @@ -123,7 +135,7 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge public byte[] Call(byte[] memento) { - throw new Exception("Expected exception."); + throw new Exception(ExpectedExceptionMessage); } } } http://git-wip-us.apache.org/repos/asf/reef/blob/ffa976a7/lang/java/reef-bridge-java/src/main/avro/Bridge.avsc ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/avro/Bridge.avsc b/lang/java/reef-bridge-java/src/main/avro/Bridge.avsc new file mode 100644 index 0000000..1b75efd --- /dev/null +++ b/lang/java/reef-bridge-java/src/main/avro/Bridge.avsc @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + [ +/* + * Defines the schema for Bridge classes such that information can be passed from C# Evaluator to C# Driver. + */ + { + "namespace":"org.apache.reef.javabridge.avro", + "type":"record", + "name":"AvroFailedTask", + "doc":"Defines the schema for failed task. Tunnels Task failures from C# Evaluator to Java Driver to C# Driver.", + "fields":[ + { + "name":"identifier", + "doc":"The Task ID of the failed Task.", + "type":"string" + }, + { + "name":"data", + "doc":"The data passed back from the Failed Task, if any.", + "type":"bytes" + }, + { + "name":"cause", + "doc":"The serialized Exception of that caused the Task failure.", + "type":"bytes" + }, + { + "name":"message", + "doc":"The message of the Task failure, if any.", + "type":"string" + } + ] + } +] \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/ffa976a7/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedTaskBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedTaskBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedTaskBridge.java index d80d6df..0f2221a 100644 --- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedTaskBridge.java +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedTaskBridge.java @@ -18,12 +18,19 @@ */ package org.apache.reef.javabridge; +import org.apache.avro.io.*; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.commons.lang3.StringUtils; import org.apache.reef.annotations.audience.Interop; import org.apache.reef.annotations.audience.Private; import org.apache.reef.driver.task.FailedTask; +import org.apache.reef.javabridge.avro.AvroFailedTask; -import java.nio.charset.StandardCharsets; -import java.util.logging.Level; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; import java.util.logging.Logger; /** @@ -36,8 +43,9 @@ import java.util.logging.Logger; public final class FailedTaskBridge extends NativeBridge { private static final Logger LOG = Logger.getLogger(FailedTaskBridge.class.getName()); - private FailedTask jfailedTask; - private ActiveContextBridge jactiveContext; + private final FailedTask jfailedTask; + private final ActiveContextBridge jactiveContext; + private final byte[] failedTaskSerializedAvro; public FailedTaskBridge(final FailedTask failedTask, final ActiveContextBridgeFactory factory) { this.jfailedTask = failedTask; @@ -46,29 +54,60 @@ public final class FailedTaskBridge extends NativeBridge { } else { this.jactiveContext = null; } + + try { + this.failedTaskSerializedAvro = generateFailedTaskSerializedAvro(); + } catch(final Exception e) { + throw new RuntimeException(e); + } } public ActiveContextBridge getActiveContext() { return jactiveContext; } - public String getFailedTaskString() { - final String description = jfailedTask.getDescription().isPresent() ? - jfailedTask.getDescription().get().replace("=", "").replace(",", "") : ""; - final String cause = jfailedTask.getReason().isPresent() ? - jfailedTask.getReason().get().toString().replace("=", "").replace(",", "") : ""; - final String data = jfailedTask.getData().isPresent() ? - new String(jfailedTask.getData().get(), StandardCharsets.UTF_8).replace("=", "").replace(",", "") : ""; + public byte[] getFailedTaskSerializedAvro() { + return failedTaskSerializedAvro; + } + + private byte[] generateFailedTaskSerializedAvro() throws IOException { + AvroFailedTask avroFailedTask = null; + + if (jfailedTask.getData() != null && jfailedTask.getData().isPresent()) { + // Deserialize what was passed in from C#. + try (final ByteArrayInputStream fileInputStream = new ByteArrayInputStream(jfailedTask.getData().get())) { + final JsonDecoder decoder = DecoderFactory.get().jsonDecoder( + AvroFailedTask.getClassSchema(), fileInputStream); + final SpecificDatumReader<AvroFailedTask> reader = + new SpecificDatumReader<>(AvroFailedTask.class); + avroFailedTask = reader.read(null, decoder); + } + } else { + // This may result from a failed Evaluator. + avroFailedTask = AvroFailedTask.newBuilder() + .setIdentifier(jfailedTask.getId()) + .setCause(ByteBuffer.wrap(new byte[0])) + .setData(ByteBuffer.wrap(new byte[0])) + .setMessage("") + .build(); + } - // TODO[JIRA REEF-796]: deserialize/serialize with proper Avro schema - final String poorSerializedString = "Identifier=" + jfailedTask.getId().replace("=", "").replace(",", "") - + ", Message=" + jfailedTask.getMessage().replace("=", "").replace(",", "") - + ", Description=" + description - + ", Cause=" + cause - + ", Data=" + data; + // Overwrite the message if Java provides a message and C# does not. + // Typically the case for failed Evaluators. + if (StringUtils.isNoneBlank(jfailedTask.getMessage()) && + StringUtils.isBlank(avroFailedTask.getMessage().toString())) { + avroFailedTask.setMessage(jfailedTask.getMessage()); + } - LOG.log(Level.INFO, "serialized failed task " + poorSerializedString); - return poorSerializedString; + final DatumWriter<AvroFailedTask> datumWriter = new SpecificDatumWriter<>(AvroFailedTask.class); + + try (final ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { + final JsonEncoder encoder = EncoderFactory.get().jsonEncoder(avroFailedTask.getSchema(), outputStream); + datumWriter.write(avroFailedTask, encoder); + encoder.flush(); + outputStream.flush(); + return outputStream.toByteArray(); + } } @Override
