[FLINK-7703] Port JobExceptionsHandler to new REST endpoint

This closes #4834.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cb528a11
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cb528a11
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cb528a11

Branch: refs/heads/master
Commit: cb528a114b4f4bac04620f0dd6aeead773de0d0e
Parents: e14f2db
Author: zjureel <[email protected]>
Authored: Wed Oct 18 11:39:49 2017 +0800
Committer: zentol <[email protected]>
Committed: Wed Oct 18 12:51:00 2017 +0200

----------------------------------------------------------------------
 .../dispatcher/DispatcherRestEndpoint.java      |  11 ++
 .../rest/handler/job/JobExceptionsHandler.java  | 100 +++++++++++++
 .../rest/messages/JobExceptionsHeaders.java     |  70 +++++++++
 .../rest/messages/JobExceptionsInfo.java        | 141 +++++++++++++++++++
 .../messages/JobExceptionsInfoNoRootTest.java   |  52 +++++++
 .../rest/messages/JobExceptionsInfoTest.java    |  52 +++++++
 6 files changed, 426 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cb528a11/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index c23bb98..ac4897b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.job.BlobServerPortHandler;
 import org.apache.flink.runtime.rest.handler.job.JobConfigHandler;
+import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler;
 import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
 import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
 import org.apache.flink.runtime.rest.handler.job.JobTerminationHandler;
@@ -53,6 +54,7 @@ import 
org.apache.flink.runtime.rest.messages.DashboardConfiguration;
 import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.JobConfigHeaders;
+import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
 import org.apache.flink.runtime.rest.messages.JobPlanHeaders;
 import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
 import 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders;
@@ -207,6 +209,14 @@ public class DispatcherRestEndpoint extends 
RestServerEndpoint {
                        executor,
                        checkpointStatsCache);
 
+               JobExceptionsHandler jobExceptionsHandler = new 
JobExceptionsHandler(
+                       restAddressFuture,
+                       leaderRetriever,
+                       timeout,
+                       JobExceptionsHeaders.getInstance(),
+                       executionGraphCache,
+                       executor);
+
                final File tmpDir = restConfiguration.getTmpDir();
 
                Optional<StaticFileServerHandler<DispatcherGateway>> 
optWebContent;
@@ -233,6 +243,7 @@ public class DispatcherRestEndpoint extends 
RestServerEndpoint {
                
handlers.add(Tuple2.of(CheckpointStatisticDetailsHeaders.getInstance(), 
checkpointStatisticDetailsHandler));
                handlers.add(Tuple2.of(JobPlanHeaders.getInstance(), 
jobPlanHandler));
                
handlers.add(Tuple2.of(TaskCheckpointStatisticsHeaders.getInstance(), 
taskCheckpointStatisticDetailsHandler));
+               handlers.add(Tuple2.of(JobExceptionsHeaders.getInstance(), 
jobExceptionsHandler));
 
                BlobServerPortHandler blobServerPortHandler = new 
BlobServerPortHandler(restAddressFuture, leaderRetriever, timeout);
                
handlers.add(Tuple2.of(blobServerPortHandler.getMessageHeaders(), 
blobServerPortHandler));

http://git-wip-us.apache.org/repos/asf/flink/blob/cb528a11/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
new file mode 100644
index 0000000..feabbea
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobExceptionsInfo;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ExceptionUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Handler serving the job exceptions.
+ */
+public class JobExceptionsHandler extends 
AbstractExecutionGraphHandler<JobExceptionsInfo, JobMessageParameters> {
+
+       static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
+
+       public JobExceptionsHandler(
+                       CompletableFuture<String> localRestAddress,
+                       GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
+                       Time timeout,
+                       MessageHeaders<EmptyRequestBody, JobExceptionsInfo, 
JobMessageParameters> messageHeaders,
+                       ExecutionGraphCache executionGraphCache,
+                       Executor executor) {
+
+               super(
+                       localRestAddress,
+                       leaderRetriever,
+                       timeout,
+                       messageHeaders,
+                       executionGraphCache,
+                       executor);
+       }
+
+       @Override
+       protected JobExceptionsInfo 
handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, 
AccessExecutionGraph executionGraph) {
+               ErrorInfo rootException = executionGraph.getFailureCause();
+               String rootExceptionMessage = null;
+               Long rootTimestamp = null;
+               if (rootException != null && 
!rootException.getExceptionAsString().equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION))
 {
+                       rootExceptionMessage = 
rootException.getExceptionAsString();
+                       rootTimestamp = rootException.getTimestamp();
+               }
+
+               List<JobExceptionsInfo.ExecutionExceptionInfo> 
taskExceptionList = new ArrayList<>();
+               boolean truncated = false;
+               for (AccessExecutionVertex task : 
executionGraph.getAllExecutionVertices()) {
+                       String t = task.getFailureCauseAsString();
+                       if (t != null && 
!t.equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) {
+                               if (taskExceptionList.size() >= 
MAX_NUMBER_EXCEPTION_TO_REPORT) {
+                                       truncated = true;
+                                       break;
+                               }
+
+                               TaskManagerLocation location = 
task.getCurrentAssignedResourceLocation();
+                               String locationString = location != null ?
+                                       location.getFQDNHostname() + ':' + 
location.dataPort() : "(unassigned)";
+                               long timestamp = 
task.getStateTimestamp(ExecutionState.FAILED);
+                               taskExceptionList.add(new 
JobExceptionsInfo.ExecutionExceptionInfo(
+                                       t,
+                                       task.getTaskNameWithSubtaskIndex(),
+                                       locationString,
+                                       timestamp == 0 ? -1 : timestamp));
+                       }
+               }
+
+               return new JobExceptionsInfo(rootExceptionMessage, 
rootTimestamp, taskExceptionList, truncated);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cb528a11/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsHeaders.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsHeaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsHeaders.java
new file mode 100644
index 0000000..7b924b3
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsHeaders.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link JobExceptionsHandler}.
+ */
+public class JobExceptionsHeaders implements MessageHeaders<EmptyRequestBody, 
JobExceptionsInfo, JobMessageParameters> {
+
+       private static final JobExceptionsHeaders INSTANCE = new 
JobExceptionsHeaders();
+
+       public static final String URL = "/jobs/:jobid/exceptions";
+
+       private JobExceptionsHeaders() {}
+
+       @Override
+       public Class<EmptyRequestBody> getRequestClass() {
+               return EmptyRequestBody.class;
+       }
+
+       @Override
+       public Class<JobExceptionsInfo> getResponseClass() {
+               return JobExceptionsInfo.class;
+       }
+
+       @Override
+       public HttpResponseStatus getResponseStatusCode() {
+               return HttpResponseStatus.OK;
+       }
+
+       @Override
+       public JobMessageParameters getUnresolvedMessageParameters() {
+               return new JobMessageParameters();
+       }
+
+       @Override
+       public HttpMethodWrapper getHttpMethod() {
+               return HttpMethodWrapper.GET;
+       }
+
+       @Override
+       public String getTargetRestEndpointURL() {
+               return URL;
+       }
+
+       public static JobExceptionsHeaders getInstance() {
+               return INSTANCE;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cb528a11/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfo.java
new file mode 100644
index 0000000..83b1134
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfo.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Response type of the {@link JobExceptionsHandler}.
+ */
+public class JobExceptionsInfo implements ResponseBody {
+
+       public static final String FIELD_NAME_ROOT_EXCEPTION = "root-exception";
+       public static final String FIELD_NAME_TIMESTAMP = "timestamp";
+       public static final String FIELD_NAME_ALL_EXCEPTIONS = "all-exceptions";
+       public static final String FIELD_NAME_TRUNCATED = "truncated";
+
+       @JsonProperty(FIELD_NAME_ROOT_EXCEPTION)
+       private final String rootException;
+
+       @JsonProperty(FIELD_NAME_TIMESTAMP)
+       private final Long rootTimestamp;
+
+       @JsonProperty(FIELD_NAME_ALL_EXCEPTIONS)
+       private final List<ExecutionExceptionInfo> allExceptions;
+
+       @JsonProperty(FIELD_NAME_TRUNCATED)
+       private final boolean truncated;
+
+       @JsonCreator
+       public JobExceptionsInfo(
+               @JsonProperty(FIELD_NAME_ROOT_EXCEPTION) String rootException,
+               @JsonProperty(FIELD_NAME_TIMESTAMP) Long rootTimestamp,
+               @JsonProperty(FIELD_NAME_ALL_EXCEPTIONS) 
List<ExecutionExceptionInfo> allExceptions,
+               @JsonProperty(FIELD_NAME_TRUNCATED) boolean truncated) {
+               this.rootException = rootException;
+               this.rootTimestamp = rootTimestamp;
+               this.allExceptions = Preconditions.checkNotNull(allExceptions);
+               this.truncated = truncated;
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               JobExceptionsInfo that = (JobExceptionsInfo) o;
+               return truncated == that.truncated &&
+                       Objects.equals(rootException, that.rootException) &&
+                       Objects.equals(rootTimestamp, that.rootTimestamp) &&
+                       Objects.equals(allExceptions, that.allExceptions);
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(rootException, rootTimestamp, 
allExceptions, truncated);
+       }
+
+       
//---------------------------------------------------------------------------------
+       // Static helper classes
+       
//---------------------------------------------------------------------------------
+
+       /**
+        * Nested class to encapsulate the task execution exception.
+        */
+       public static final class ExecutionExceptionInfo {
+               public static final String FIELD_NAME_EXCEPTION = "exception";
+               public static final String FIELD_NAME_TASK = "task";
+               public static final String FIELD_NAME_LOCATION = "location";
+               public static final String FIELD_NAME_TIMESTAMP = "timestamp";
+
+               @JsonProperty(FIELD_NAME_EXCEPTION)
+               private final String exception;
+
+               @JsonProperty(FIELD_NAME_TASK)
+               private final String task;
+
+               @JsonProperty(FIELD_NAME_LOCATION)
+               private final String location;
+
+               @JsonProperty(FIELD_NAME_TIMESTAMP)
+               private final long timestamp;
+
+               @JsonCreator
+               public ExecutionExceptionInfo(
+                       @JsonProperty(FIELD_NAME_EXCEPTION) String exception,
+                       @JsonProperty(FIELD_NAME_TASK) String task,
+                       @JsonProperty(FIELD_NAME_LOCATION) String location,
+                       @JsonProperty(FIELD_NAME_TIMESTAMP) long timestamp) {
+                       this.exception = Preconditions.checkNotNull(exception);
+                       this.task = Preconditions.checkNotNull(task);
+                       this.location = Preconditions.checkNotNull(location);
+                       this.timestamp = timestamp;
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) {
+                               return true;
+                       }
+                       if (o == null || getClass() != o.getClass()) {
+                               return false;
+                       }
+                       JobExceptionsInfo.ExecutionExceptionInfo that = 
(JobExceptionsInfo.ExecutionExceptionInfo) o;
+                       return timestamp == that.timestamp &&
+                               Objects.equals(exception, that.exception) &&
+                               Objects.equals(task, that.task) &&
+                               Objects.equals(location, that.location);
+               }
+
+               @Override
+               public int hashCode() {
+                       return Objects.hash(timestamp, exception, task, 
location);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cb528a11/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoNoRootTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoNoRootTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoNoRootTest.java
new file mode 100644
index 0000000..a3c23f0
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoNoRootTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests that the {@link JobExceptionsInfo} with no root exception can be 
marshalled and unmarshalled.
+ */
+public class JobExceptionsInfoNoRootTest extends 
RestResponseMarshallingTestBase<JobExceptionsInfo> {
+       @Override
+       protected Class<JobExceptionsInfo> getTestResponseClass() {
+               return JobExceptionsInfo.class;
+       }
+
+       @Override
+       protected JobExceptionsInfo getTestResponseInstance() throws Exception {
+               List<JobExceptionsInfo.ExecutionExceptionInfo> 
executionTaskExceptionInfoList = new ArrayList<>();
+               executionTaskExceptionInfoList.add(new 
JobExceptionsInfo.ExecutionExceptionInfo(
+                       "exception1",
+                       "task1",
+                       "location1",
+                       System.currentTimeMillis()));
+               executionTaskExceptionInfoList.add(new 
JobExceptionsInfo.ExecutionExceptionInfo(
+                       "exception2",
+                       "task2",
+                       "location2",
+                       System.currentTimeMillis()));
+               return new JobExceptionsInfo(
+                       null,
+                       null,
+                       executionTaskExceptionInfoList,
+                       false);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cb528a11/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoTest.java
new file mode 100644
index 0000000..b8f3baa
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests that the {@link JobExceptionsInfo} can be marshalled and unmarshalled.
+ */
+public class JobExceptionsInfoTest extends 
RestResponseMarshallingTestBase<JobExceptionsInfo>  {
+       @Override
+       protected Class<JobExceptionsInfo> getTestResponseClass() {
+               return JobExceptionsInfo.class;
+       }
+
+       @Override
+       protected JobExceptionsInfo getTestResponseInstance() throws Exception {
+               List<JobExceptionsInfo.ExecutionExceptionInfo> 
executionTaskExceptionInfoList = new ArrayList<>();
+               executionTaskExceptionInfoList.add(new 
JobExceptionsInfo.ExecutionExceptionInfo(
+                       "exception1",
+                       "task1",
+                       "location1",
+                       System.currentTimeMillis()));
+               executionTaskExceptionInfoList.add(new 
JobExceptionsInfo.ExecutionExceptionInfo(
+                       "exception2",
+                       "task2",
+                       "location2",
+                       System.currentTimeMillis()));
+               return new JobExceptionsInfo(
+                       "root exception",
+                       System.currentTimeMillis(),
+                       executionTaskExceptionInfoList,
+                       false);
+       }
+}

Reply via email to