Repository: tez Updated Branches: refs/heads/branch-0.8 2e483ef2c -> 851b6541b
TEZ-3007. Use AppFinalState.ENDED when unregistering with the RM in session mode (harishjp) (cherry picked from commit 8c4407798ead6b771df6c8eb47bc0775ceaf67b5) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/851b6541 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/851b6541 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/851b6541 Branch: refs/heads/branch-0.8 Commit: 851b6541b5c44ec3a9dc1951b1c4e8497d591a56 Parents: 2e483ef Author: Harish JP <[email protected]> Authored: Wed May 24 11:39:20 2017 +0530 Committer: Harish JP <[email protected]> Committed: Wed May 24 11:42:25 2017 +0530 ---------------------------------------------------------------------- .../apache/tez/hadoop/shim/HadoopShim28.java | 16 ++++++ .../tez/hadoop/shim/TestHadoopShim28.java | 60 ++++++++++++++++++++ .../org/apache/tez/hadoop/shim/HadoopShim.java | 7 ++- .../org/apache/tez/dag/app/DAGAppMaster.java | 2 +- .../tez/dag/app/rm/TaskSchedulerManager.java | 10 +++- .../dag/app/rm/TestTaskSchedulerHelpers.java | 3 +- .../dag/app/rm/TestTaskSchedulerManager.java | 13 +++-- 7 files changed, 101 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/851b6541/hadoop-shim-impls/hadoop-shim-2.8/src/main/java/org/apache/tez/hadoop/shim/HadoopShim28.java ---------------------------------------------------------------------- diff --git a/hadoop-shim-impls/hadoop-shim-2.8/src/main/java/org/apache/tez/hadoop/shim/HadoopShim28.java b/hadoop-shim-impls/hadoop-shim-2.8/src/main/java/org/apache/tez/hadoop/shim/HadoopShim28.java index 0c599e4..5504c02 100644 --- a/hadoop-shim-impls/hadoop-shim-2.8/src/main/java/org/apache/tez/hadoop/shim/HadoopShim28.java +++ b/hadoop-shim-impls/hadoop-shim-2.8/src/main/java/org/apache/tez/hadoop/shim/HadoopShim28.java @@ -24,6 +24,7 @@ import java.util.Set; import org.apache.hadoop.ipc.CallerContext; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; public class HadoopShim28 extends HadoopShim { @@ -49,4 +50,19 @@ public class HadoopShim28 extends HadoopShim { } return supportedTypes; } + + @Override + public FinalApplicationStatus applyFinalApplicationStatusCorrection(FinalApplicationStatus orig, + boolean isSessionMode, boolean isError) { + switch (orig) { + case FAILED: + // App is failed if dag failed in non-session mode or there was an error. + return (!isSessionMode || isError) ? + FinalApplicationStatus.FAILED : FinalApplicationStatus.ENDED; + case SUCCEEDED: + return isSessionMode ? FinalApplicationStatus.ENDED : FinalApplicationStatus.SUCCEEDED; + default: + return orig; + } + } } http://git-wip-us.apache.org/repos/asf/tez/blob/851b6541/hadoop-shim-impls/hadoop-shim-2.8/src/test/java/org/apache/tez/hadoop/shim/TestHadoopShim28.java ---------------------------------------------------------------------- diff --git a/hadoop-shim-impls/hadoop-shim-2.8/src/test/java/org/apache/tez/hadoop/shim/TestHadoopShim28.java b/hadoop-shim-impls/hadoop-shim-2.8/src/test/java/org/apache/tez/hadoop/shim/TestHadoopShim28.java new file mode 100644 index 0000000..c7d8c63 --- /dev/null +++ b/hadoop-shim-impls/hadoop-shim-2.8/src/test/java/org/apache/tez/hadoop/shim/TestHadoopShim28.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.hadoop.shim; + +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.junit.Assert; +import org.junit.Test; + +public class TestHadoopShim28 { + + @Test + public void testApplyFinalApplicationStatusCorrection() { + HadoopShim shim = new HadoopShim28(); + // Session mode success/failure, change to ended + Assert.assertEquals(FinalApplicationStatus.ENDED, + shim.applyFinalApplicationStatusCorrection(FinalApplicationStatus.SUCCEEDED, true, false)); + Assert.assertEquals(FinalApplicationStatus.ENDED, + shim.applyFinalApplicationStatusCorrection(FinalApplicationStatus.FAILED, true, false)); + + // Non-session mode success/failure, retain success/failure + Assert.assertEquals(FinalApplicationStatus.SUCCEEDED, + shim.applyFinalApplicationStatusCorrection(FinalApplicationStatus.SUCCEEDED, false, false)); + Assert.assertEquals(FinalApplicationStatus.FAILED, + shim.applyFinalApplicationStatusCorrection(FinalApplicationStatus.FAILED, false, false)); + + // Session and non-session mode error, retain failed. + Assert.assertEquals(FinalApplicationStatus.FAILED, + shim.applyFinalApplicationStatusCorrection(FinalApplicationStatus.FAILED, true, true)); + Assert.assertEquals(FinalApplicationStatus.FAILED, + shim.applyFinalApplicationStatusCorrection(FinalApplicationStatus.FAILED, false, true)); + + // Session and non-session mode killed is killed. + Assert.assertEquals(FinalApplicationStatus.KILLED, + shim.applyFinalApplicationStatusCorrection(FinalApplicationStatus.KILLED, true, false)); + Assert.assertEquals(FinalApplicationStatus.KILLED, + shim.applyFinalApplicationStatusCorrection(FinalApplicationStatus.KILLED, false, false)); + + // Session and non-session mode undefined is undefined. + Assert.assertEquals(FinalApplicationStatus.UNDEFINED, + shim.applyFinalApplicationStatusCorrection(FinalApplicationStatus.UNDEFINED, true, false)); + Assert.assertEquals(FinalApplicationStatus.UNDEFINED, + shim.applyFinalApplicationStatusCorrection(FinalApplicationStatus.UNDEFINED, false, false)); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/851b6541/hadoop-shim/src/main/java/org/apache/tez/hadoop/shim/HadoopShim.java ---------------------------------------------------------------------- diff --git a/hadoop-shim/src/main/java/org/apache/tez/hadoop/shim/HadoopShim.java b/hadoop-shim/src/main/java/org/apache/tez/hadoop/shim/HadoopShim.java index 7314c2d..47da74a 100644 --- a/hadoop-shim/src/main/java/org/apache/tez/hadoop/shim/HadoopShim.java +++ b/hadoop-shim/src/main/java/org/apache/tez/hadoop/shim/HadoopShim.java @@ -23,8 +23,7 @@ import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; - -import com.google.common.collect.Sets; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; @Public @Unstable @@ -57,4 +56,8 @@ public abstract class HadoopShim { return null; } + public FinalApplicationStatus applyFinalApplicationStatusCorrection(FinalApplicationStatus orig, + boolean isSessionMode, boolean isError) { + return orig; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/851b6541/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 7ad6405..cbf7353 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -651,7 +651,7 @@ public class DAGAppMaster extends AbstractService { List<NamedEntityDescriptor> taskSchedulerDescriptors) { return new TaskSchedulerManager(context, clientRpcServer, dispatcher.getEventHandler(), containerSignatureMatcher, webUIService, - taskSchedulerDescriptors, isLocal); + taskSchedulerDescriptors, isLocal, hadoopShim); } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/tez/blob/851b6541/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java index e68c9b8..d0d69d0 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java @@ -89,6 +89,8 @@ import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptEnded; import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptSucceeded; import org.apache.tez.dag.app.web.WebUIService; import org.apache.tez.dag.records.TaskAttemptTerminationCause; +import org.apache.tez.hadoop.shim.HadoopShim; +import org.apache.tez.hadoop.shim.HadoopShimsLoader; import com.google.common.base.Preconditions; @@ -132,6 +134,7 @@ public class TaskSchedulerManager extends AbstractService implements // Custom AppIds to avoid container conflicts if there's multiple sources private final long SCHEDULER_APP_ID_BASE = 111101111; private final long SCHEDULER_APP_ID_INCREMENT = 111111111; + private final HadoopShim hadoopShim; BlockingQueue<AMSchedulerEvent> eventQueue = new LinkedBlockingQueue<AMSchedulerEvent>(); @@ -160,6 +163,7 @@ public class TaskSchedulerManager extends AbstractService implements this.webUI = null; this.historyUrl = null; this.isLocalMode = false; + this.hadoopShim = new HadoopShimsLoader(appContext.getAMConf()).getHadoopShim(); } /** @@ -179,7 +183,8 @@ public class TaskSchedulerManager extends AbstractService implements ContainerSignatureMatcher containerSignatureMatcher, WebUIService webUI, List<NamedEntityDescriptor> schedulerDescriptors, - boolean isLocalMode) { + boolean isLocalMode, + HadoopShim hadoopShim) { super(TaskSchedulerManager.class.getName()); Preconditions.checkArgument(schedulerDescriptors != null && !schedulerDescriptors.isEmpty(), "TaskSchedulerDescriptors must be specified"); @@ -190,6 +195,7 @@ public class TaskSchedulerManager extends AbstractService implements this.webUI = webUI; this.historyUrl = getHistoryUrl(); this.isLocalMode = isLocalMode; + this.hadoopShim = hadoopShim; this.appCallbackExecutor = createAppCallbackExecutorService(); if (this.webUI != null) { this.webUI.setHistoryUrl(this.historyUrl); @@ -798,6 +804,8 @@ public class TaskSchedulerManager extends AbstractService implements } else { finishState = FinalApplicationStatus.UNDEFINED; } + finishState = hadoopShim.applyFinalApplicationStatusCorrection(finishState, + dagAppMaster.isSession(), appMasterState == DAGAppMasterState.ERROR); List<String> diagnostics = dagAppMaster.getDiagnostics(); if(diagnostics != null) { for (String s : diagnostics) { http://git-wip-us.apache.org/repos/asf/tez/blob/851b6541/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java index d8170e3..2cabb27 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java @@ -70,6 +70,7 @@ import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService; import org.apache.tez.dag.app.rm.YarnTaskSchedulerService.CookieContainerRequest; +import org.apache.tez.hadoop.shim.HadoopShimsLoader; import org.apache.tez.serviceplugins.api.DagInfo; import org.apache.tez.serviceplugins.api.ServicePluginError; import org.apache.tez.serviceplugins.api.TaskScheduler; @@ -144,7 +145,7 @@ class TestTaskSchedulerHelpers { UserPayload defaultPayload) { super(appContext, null, eventHandler, containerSignatureMatcher, null, Lists.newArrayList(new NamedEntityDescriptor("FakeScheduler", null)), - false); + false, new HadoopShimsLoader(appContext.getAMConf()).getHadoopShim()); this.amrmClientAsync = amrmClientAsync; this.containerSignatureMatcher = containerSignatureMatcher; this.defaultPayload = defaultPayload; http://git-wip-us.apache.org/repos/asf/tez/blob/851b6541/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java index 791bb7f..8539d48 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java @@ -94,6 +94,7 @@ import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.hadoop.shim.HadoopShimsLoader; import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.serviceplugins.api.ServicePluginErrorDefaults; import org.apache.tez.serviceplugins.api.ServicePluginException; @@ -130,7 +131,8 @@ public class TestTaskSchedulerManager { ContainerSignatureMatcher containerSignatureMatcher, WebUIService webUI) { super(appContext, clientService, eventHandler, containerSignatureMatcher, webUI, - Lists.newArrayList(new NamedEntityDescriptor("FakeDescriptor", null)), false); + Lists.newArrayList(new NamedEntityDescriptor("FakeDescriptor", null)), false, + new HadoopShimsLoader(appContext.getAMConf()).getHadoopShim()); } @Override @@ -574,7 +576,8 @@ public class TestTaskSchedulerManager { TaskSchedulerManager taskSchedulerManager = new TaskSchedulerManager(appContext, dagClientServer, eventHandler, - mock(ContainerSignatureMatcher.class), mock(WebUIService.class), list, false) { + mock(ContainerSignatureMatcher.class), mock(WebUIService.class), list, false, + new HadoopShimsLoader(appContext.getAMConf()).getHadoopShim()) { @Override TaskSchedulerContext wrapTaskSchedulerContext(TaskSchedulerContext rawContext) { // Avoid wrapping in threads @@ -638,8 +641,7 @@ public class TestTaskSchedulerManager { doReturn(address).when(mockClientService).getBindAddress(); TaskSchedulerManager taskSchedulerManager = new TaskSchedulerManager(taskScheduler, appContext, mock(ContainerSignatureMatcher.class), - mockClientService, - Executors.newFixedThreadPool(1)) { + mockClientService, Executors.newFixedThreadPool(1)) { @Override protected void instantiateSchedulers(String host, int port, String trackingUrl, AppContext appContext) throws TezException { @@ -726,7 +728,8 @@ public class TestTaskSchedulerManager { List<NamedEntityDescriptor> schedulerDescriptors, boolean isPureLocalMode) { super(appContext, clientService, eventHandler, containerSignatureMatcher, webUI, - schedulerDescriptors, isPureLocalMode); + schedulerDescriptors, isPureLocalMode, + new HadoopShimsLoader(appContext.getAMConf()).getHadoopShim()); yarnTaskScheduler = mock(TaskScheduler.class); uberTaskScheduler = mock(TaskScheduler.class); }
