Repository: asterixdb Updated Branches: refs/heads/master d8192749d -> 2c1c263b0
[ASTERIXDB-2002][HYR] Report failures during task start - user model changes: no - storage format changes: no - interface changes: no details: - failures that happen before creating the task object were never reported because the task object was null and they simply throw null pointer exception. Change-Id: Ibf79088c1ea08e66a7b130e4836f153ea9603723 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1900 Reviewed-by: Xikui Wang <[email protected]> Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> BAD: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/2c1c263b Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/2c1c263b Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/2c1c263b Branch: refs/heads/master Commit: 2c1c263b06872f6a5c274fcad99877b427b7e491 Parents: d819274 Author: Abdullah Alamoudi <[email protected]> Authored: Tue Jul 25 13:17:13 2017 -0700 Committer: abdullah alamoudi <[email protected]> Committed: Tue Jul 25 16:56:16 2017 -0700 ---------------------------------------------------------------------- .../SuperActivityOperatorNodePushable.java | 5 ++-- .../org/apache/hyracks/control/nc/Task.java | 8 ++++-- .../control/nc/work/NotifyTaskFailureWork.java | 21 ++++++++++---- .../hyracks/control/nc/work/StartTasksWork.java | 10 +++++-- .../AbstractMultiNCIntegrationTest.java | 29 ++++++++++++++----- .../tests/integration/JobFailureTest.java | 2 +- .../integration/LocalityAwareConnectorTest.java | 30 ++++++++++---------- ...onOnCreatePushRuntimeOperatorDescriptor.java | 3 +- 8 files changed, 71 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2c1c263b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java index def4c83..83ab532 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java @@ -61,7 +61,8 @@ public class SuperActivityOperatorNodePushable implements IOperatorNodePushable private int inputArity = 0; public SuperActivityOperatorNodePushable(SuperActivity parent, Map<ActivityId, IActivity> startActivities, - IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) { + IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) + throws HyracksDataException { this.parent = parent; this.startActivities = startActivities; this.ctx = ctx; @@ -76,7 +77,7 @@ public class SuperActivityOperatorNodePushable implements IOperatorNodePushable try { init(); } catch (Exception e) { - throw new IllegalStateException(e); + throw HyracksDataException.create(e); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2c1c263b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java index 74a628d..bff2794 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java @@ -275,7 +275,8 @@ public class Task implements IHyracksTaskContext, ICounterContext, Runnable { if (!addPendingThread(ct)) { exceptions.add(HyracksDataException.create(TASK_ABORTED, getTaskAttemptId())); ExceptionUtils.setNodeIds(exceptions, ncs.getId()); - ncs.getWorkQueue().schedule(new NotifyTaskFailureWork(ncs, this, exceptions)); + ncs.getWorkQueue() + .schedule(new NotifyTaskFailureWork(ncs, this, exceptions, joblet.getJobId(), taskAttemptId)); return; } ct.setName(displayName + ":" + taskAttemptId + ":" + 0); @@ -353,13 +354,14 @@ public class Task implements IHyracksTaskContext, ICounterContext, Runnable { for (int i = 0; i < exceptions.size(); i++) { LOGGER.log(Level.WARNING, "Task " + taskAttemptId + " failed with exception" - + (exceptions.size() > 1 ? "s (" + (i + 1) + "/" + exceptions.size() + ")" : ""), + + (exceptions.size() > 1 ? "s (" + (i + 1) + "/" + exceptions.size() + ")" : ""), exceptions.get(i)); } } NodeControllerService ncs = joblet.getNodeController(); ExceptionUtils.setNodeIds(exceptions, ncs.getId()); - ncs.getWorkQueue().schedule(new NotifyTaskFailureWork(ncs, this, exceptions)); + ncs.getWorkQueue() + .schedule(new NotifyTaskFailureWork(ncs, this, exceptions, joblet.getJobId(), taskAttemptId)); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2c1c263b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java index e81fa5a..fa8ba28 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java @@ -19,7 +19,10 @@ package org.apache.hyracks.control.nc.work; import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.apache.hyracks.api.dataflow.TaskAttemptId; import org.apache.hyracks.api.dataset.IDatasetPartitionManager; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.control.common.work.AbstractWork; @@ -27,30 +30,36 @@ import org.apache.hyracks.control.nc.NodeControllerService; import org.apache.hyracks.control.nc.Task; public class NotifyTaskFailureWork extends AbstractWork { + private static final Logger LOGGER = Logger.getLogger(NotifyTaskFailureWork.class.getName()); private final NodeControllerService ncs; private final Task task; + private final JobId jobId; + private final TaskAttemptId taskId; private final List<Exception> exceptions; - public NotifyTaskFailureWork(NodeControllerService ncs, Task task, List<Exception> exceptions) { + public NotifyTaskFailureWork(NodeControllerService ncs, Task task, List<Exception> exceptions, JobId jobId, + TaskAttemptId taskId) { this.ncs = ncs; this.task = task; this.exceptions = exceptions; + this.jobId = jobId; + this.taskId = taskId; } @Override public void run() { try { - JobId jobId = task.getJobletContext().getJobId(); IDatasetPartitionManager dpm = ncs.getDatasetPartitionManager(); if (dpm != null) { dpm.abortReader(jobId); } - ncs.getClusterController().notifyTaskFailure(jobId, task.getTaskAttemptId(), ncs.getId(), exceptions); - //exceptions.get(0).printStackTrace(); + ncs.getClusterController().notifyTaskFailure(jobId, taskId, ncs.getId(), exceptions); } catch (Exception e) { - e.printStackTrace(); + LOGGER.log(Level.SEVERE, "Failure reporting task failure to cluster controller", e); + } + if (task != null) { + task.getJoblet().removeTask(task); } - task.getJoblet().removeTask(task); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2c1c263b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java index b55cd4b..c369781 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java @@ -102,6 +102,7 @@ public class StartTasksWork extends AbstractWork { @Override public void run() { Task task = null; + int taskIndex = 0; try { ncs.updateMaxJobId(jobId); NCServiceContext serviceCtx = ncs.getContext(); @@ -122,7 +123,8 @@ public class StartTasksWork extends AbstractWork { return ac.getConnectorRecordDescriptorMap().get(conn.getConnectorId()); } }; - for (TaskAttemptDescriptor td : taskDescriptors) { + while (taskIndex < taskDescriptors.size()) { + TaskAttemptDescriptor td = taskDescriptors.get(taskIndex); TaskAttemptId taId = td.getTaskAttemptId(); TaskId tid = taId.getTaskId(); ActivityId aid = tid.getActivityId(); @@ -133,6 +135,7 @@ public class StartTasksWork extends AbstractWork { } final int partition = tid.getPartition(); List<IConnectorDescriptor> inputs = ac.getActivityInputMap().get(aid); + task = null; task = new Task(joblet, flags, taId, han.getClass().getName(), ncs.getExecutor(), ncs, createInputChannels(td, inputs)); IOperatorNodePushable operator = han.createPushRuntime(task, rdp, partition, td.getPartitionCount()); @@ -174,13 +177,16 @@ public class StartTasksWork extends AbstractWork { task.setTaskRuntime(collectors.toArray(new IPartitionCollector[collectors.size()]), operator); joblet.addTask(task); task.start(); + taskIndex++; } } catch (Exception e) { LOGGER.log(Level.WARNING, "Failure starting a task", e); // notify cc of start task failure List<Exception> exceptions = new ArrayList<>(); + exceptions.add(e); ExceptionUtils.setNodeIds(exceptions, ncs.getId()); - ncs.getWorkQueue().schedule(new NotifyTaskFailureWork(ncs, task, exceptions)); + TaskAttemptId taskId = taskDescriptors.get(taskIndex).getTaskAttemptId(); + ncs.getWorkQueue().schedule(new NotifyTaskFailureWork(ncs, task, exceptions, jobId, taskId)); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2c1c263b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java index 05a7e2d..18479e2 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java @@ -59,8 +59,8 @@ public abstract class AbstractMultiNCIntegrationTest { private static final Logger LOGGER = Logger.getLogger(AbstractMultiNCIntegrationTest.class.getName()); - public static final String[] ASTERIX_IDS = { "asterix-001", "asterix-002", "asterix-003", "asterix-004", - "asterix-005", "asterix-006", "asterix-007" }; + public static final String[] ASTERIX_IDS = + { "asterix-001", "asterix-002", "asterix-003", "asterix-004", "asterix-005", "asterix-006", "asterix-007" }; private static ClusterControllerService cc; @@ -103,7 +103,7 @@ public abstract class AbstractMultiNCIntegrationTest { ncConfig.setClusterListenAddress("127.0.0.1"); ncConfig.setDataListenAddress("127.0.0.1"); ncConfig.setResultListenAddress("127.0.0.1"); - ncConfig.setIODevices(new String [] { ioDev.getAbsolutePath() }); + ncConfig.setIODevices(new String[] { ioDev.getAbsolutePath() }); asterixNCs[i] = new NodeControllerService(ncConfig); asterixNCs[i].start(); } @@ -138,7 +138,7 @@ public abstract class AbstractMultiNCIntegrationTest { hcc.cancelJob(jobId); } - protected void runTest(JobSpecification spec) throws Exception { + protected void runTest(JobSpecification spec, String expectedErrorMessage) throws Exception { if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info(spec.toJSON().asText()); } @@ -180,14 +180,29 @@ public abstract class AbstractMultiNCIntegrationTest { try { bbis.close(); } catch (IOException e) { - throw new HyracksDataException(e); + throw HyracksDataException.create(e); } } - readSize = reader.read(resultFrame); } } - hcc.waitForCompletion(jobId); + boolean expectedExceptionThrown = false; + try { + hcc.waitForCompletion(jobId); + } catch (HyracksDataException hde) { + if (expectedErrorMessage != null) { + if (hde.toString().contains(expectedErrorMessage)) { + expectedExceptionThrown = true; + } else { + throw hde; + } + } else { + throw hde; + } + } + if (expectedErrorMessage != null && !expectedExceptionThrown) { + throw new Exception("Expected error (" + expectedErrorMessage + ") was not thrown"); + } dumpOutputFiles(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2c1c263b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java index 871109a..13a103e 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/JobFailureTest.java @@ -48,7 +48,7 @@ public class JobFailureTest extends AbstractMultiNCIntegrationTest { spec.connect(conn, sourceOpDesc, 0, sinkOpDesc, 0); spec.addRoot(sinkOpDesc); try { - runTest(spec); + runTest(spec, ExceptionOnCreatePushRuntimeOperatorDescriptor.ERROR_MESSAGE); } catch (Exception e) { e.printStackTrace(); throw e; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2c1c263b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java index e92113a..67642f4 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java @@ -69,15 +69,15 @@ import org.junit.Test; public class LocalityAwareConnectorTest extends AbstractMultiNCIntegrationTest { - final IFileSplitProvider splitProvider = new ConstantFileSplitProvider( - new FileSplit[] { new ManagedFileSplit(ASTERIX_IDS[0], "data" + File.separator + "tpch0.001" - + File.separator + "lineitem.tbl"), - new ManagedFileSplit(ASTERIX_IDS[1], "data" + File.separator + "tpch0.001" + File.separator - + "lineitem.tbl"), - new ManagedFileSplit(ASTERIX_IDS[2], "data" + File.separator + "tpch0.001" + File.separator - + "lineitem.tbl"), - new ManagedFileSplit(ASTERIX_IDS[3], "data" + File.separator + "tpch0.001" + File.separator - + "lineitem.tbl") }); + final IFileSplitProvider splitProvider = new ConstantFileSplitProvider(new FileSplit[] { + new ManagedFileSplit(ASTERIX_IDS[0], + "data" + File.separator + "tpch0.001" + File.separator + "lineitem.tbl"), + new ManagedFileSplit(ASTERIX_IDS[1], + "data" + File.separator + "tpch0.001" + File.separator + "lineitem.tbl"), + new ManagedFileSplit(ASTERIX_IDS[2], + "data" + File.separator + "tpch0.001" + File.separator + "lineitem.tbl"), + new ManagedFileSplit(ASTERIX_IDS[3], + "data" + File.separator + "tpch0.001" + File.separator + "lineitem.tbl") }); final int fileSize = 800 * 1024 * 4; @@ -112,8 +112,8 @@ public class LocalityAwareConnectorTest extends AbstractMultiNCIntegrationTest { JobSpecification spec = new JobSpecification(); - FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory, - desc); + FileScanOperatorDescriptor csvScanner = + new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory, desc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, "asterix-001", "asterix-002", "asterix-003", "asterix-004"); @@ -163,7 +163,7 @@ public class LocalityAwareConnectorTest extends AbstractMultiNCIntegrationTest { spec.connect(conn2, grouper, 0, printer, 0); spec.addRoot(printer); - runTest(spec); + runTest(spec, null); } /** @@ -177,8 +177,8 @@ public class LocalityAwareConnectorTest extends AbstractMultiNCIntegrationTest { JobSpecification spec = new JobSpecification(); - FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory, - desc); + FileScanOperatorDescriptor csvScanner = + new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory, desc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, "asterix-001", "asterix-002", "asterix-003", "asterix-004"); @@ -221,7 +221,7 @@ public class LocalityAwareConnectorTest extends AbstractMultiNCIntegrationTest { spec.connect(conn2, grouper, 0, printer, 0); spec.addRoot(printer); - runTest(spec); + runTest(spec, null); } private AbstractSingleActivityOperatorDescriptor getPrinter(JobSpecification spec, String prefix) http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2c1c263b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java index f814cd5..d704671 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/util/ExceptionOnCreatePushRuntimeOperatorDescriptor.java @@ -38,6 +38,7 @@ public class ExceptionOnCreatePushRuntimeOperatorDescriptor extends AbstractSing private static AtomicInteger createPushRuntime = new AtomicInteger(); private static AtomicInteger initializeCounter = new AtomicInteger(); private static AtomicInteger openCloseCounter = new AtomicInteger(); + public static final String ERROR_MESSAGE = "I throw exceptions"; private final int[] exceptionPartitions; private final boolean sleepOnInitialize; @@ -56,7 +57,7 @@ public class ExceptionOnCreatePushRuntimeOperatorDescriptor extends AbstractSing if (exceptionPartitions != null) { for (int p : exceptionPartitions) { if (p == partition) { - throw new HyracksDataException("I throw exceptions"); + throw new HyracksDataException(ERROR_MESSAGE); } } }
