TEZ-2745. ClassNotFoundException of user code should fail dag (zjffdu)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/1b30b17d Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/1b30b17d Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/1b30b17d Branch: refs/heads/master Commit: 1b30b17dbbd4d1f58539a0b61fae289d09c1b303 Parents: 7d412b2 Author: Jeff Zhang <[email protected]> Authored: Fri Sep 4 21:18:46 2015 +0800 Committer: Jeff Zhang <[email protected]> Committed: Fri Sep 4 21:18:46 2015 +0800 ---------------------------------------------------------------------- CHANGES.txt | 4 + .../org/apache/tez/client/FrameworkClient.java | 8 +- .../java/org/apache/tez/client/TezClient.java | 3 +- .../org/apache/tez/common/ReflectionUtils.java | 43 ++--- .../tez/dag/api/TezReflectionException.java | 35 ++++ .../dag/api/client/TimelineReaderFactory.java | 19 ++- .../apache/tez/common/TestReflectionUtils.java | 3 +- .../org/apache/tez/dag/app/DAGAppMaster.java | 6 +- .../tez/dag/app/TaskCommunicatorManager.java | 8 +- .../app/dag/RootInputInitializerManager.java | 5 +- .../apache/tez/dag/app/dag/impl/DAGImpl.java | 27 +++- .../org/apache/tez/dag/app/dag/impl/Edge.java | 13 +- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 29 +++- .../tez/dag/app/dag/impl/VertexManager.java | 3 +- .../app/launcher/ContainerLauncherManager.java | 25 ++- .../tez/dag/app/rm/TaskSchedulerManager.java | 25 ++- .../dag/app/TestTaskCommunicatorManager.java | 13 +- .../dag/app/TestTaskCommunicatorManager1.java | 6 +- .../dag/app/TestTaskCommunicatorManager2.java | 4 +- .../tez/dag/app/dag/impl/TestDAGImpl.java | 103 ++++++++++++ .../apache/tez/dag/app/dag/impl/TestEdge.java | 3 +- .../tez/dag/app/dag/impl/TestVertexImpl.java | 162 +++++++++++++++++-- .../launcher/TestContainerLauncherManager.java | 16 +- .../dag/app/rm/TestTaskSchedulerManager.java | 8 +- .../TestHistoryEventsProtoConversion.java | 7 +- .../hadoop/mapred/split/TezGroupedSplit.java | 13 +- .../split/TezGroupedSplitsInputFormat.java | 14 +- .../hadoop/mapreduce/split/TezGroupedSplit.java | 11 +- .../split/TezGroupedSplitsInputFormat.java | 13 +- .../logging/ats/ATSHistoryLoggingService.java | 3 +- .../runtime/LogicalIOProcessorRuntimeTask.java | 12 +- .../common/resources/MemoryDistributor.java | 4 +- .../common/resources/TestMemoryDistributor.java | 11 +- .../tez/runtime/task/TestTaskExecution2.java | 7 +- .../TestWeightedScalingMemoryDistributor.java | 5 +- .../apache/tez/mapreduce/TestMRRJobsDAGApi.java | 3 +- 36 files changed, 511 insertions(+), 163 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 7d996ff..72b2c97 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ Release 0.8.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2745. ClassNotFoundException of user code should fail dag TEZ-2754. Tez UI: StartTime & EndTime is not displayed with right format in Graphical View TEZ-2752. logUnsuccessful completion in Attempt should write original finish time to ATS @@ -163,6 +164,7 @@ Release 0.7.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2745. ClassNotFoundException of user code should fail dag TEZ-2761. Tez UI: update the progress on the dag and vertices pages with info from AM TEZ-2731. Fix Tez GenericCounter performance bottleneck TEZ-2752. logUnsuccessful completion in Attempt should write original finish @@ -402,6 +404,7 @@ Release 0.6.3: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2745. ClassNotFoundException of user code should fail dag TEZ-2752. logUnsuccessful completion in Attempt should write original finish time to ATS TEZ-2742. VertexImpl.finished() terminationCause hides member var of the @@ -618,6 +621,7 @@ INCOMPATIBLE CHANGES TEZ-2552. CRC errors can cause job to run for very long time in large jobs. ALL CHANGES: + TEZ-2745. ClassNotFoundException of user code should fail dag TEZ-2752. logUnsuccessful completion in Attempt should write original finish time to ATS TEZ-2742. VertexImpl.finished() terminationCause hides member var of the http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java b/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java index e1c7d00..cb20f49 100644 --- a/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java +++ b/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java @@ -31,6 +31,8 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.tez.common.ReflectionUtils; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.TezReflectionException; +import org.apache.tez.dag.api.TezUncheckedException; @Private public abstract class FrameworkClient { @@ -39,7 +41,11 @@ public abstract class FrameworkClient { boolean isLocal = tezConf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE, TezConfiguration.TEZ_LOCAL_MODE_DEFAULT); if (isLocal) { - return ReflectionUtils.createClazzInstance("org.apache.tez.client.LocalClient"); + try { + return ReflectionUtils.createClazzInstance("org.apache.tez.client.LocalClient"); + } catch (TezReflectionException e) { + throw new TezUncheckedException("Fail to create LocalClient", e); + } } return new TezYarnClient(YarnClient.createYarnClient()); } http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-api/src/main/java/org/apache/tez/client/TezClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java index e39cf4f..0c50d86 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java @@ -56,6 +56,7 @@ import org.apache.tez.dag.api.SessionNotRunning; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.TezReflectionException; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB; @@ -356,7 +357,7 @@ public class TezClient { historyACLPolicyManager = ReflectionUtils.createClazzInstance( atsHistoryACLManagerClassName); historyACLPolicyManager.setConf(this.amConfig.getYarnConfiguration()); - } catch (TezUncheckedException e) { + } catch (TezReflectionException e) { if (!amConfig.getTezConfiguration().getBoolean( TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS_DEFAULT)) { http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-api/src/main/java/org/apache/tez/common/ReflectionUtils.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/ReflectionUtils.java b/tez-api/src/main/java/org/apache/tez/common/ReflectionUtils.java index f1eb0ae..4d89ed4 100644 --- a/tez-api/src/main/java/org/apache/tez/common/ReflectionUtils.java +++ b/tez-api/src/main/java/org/apache/tez/common/ReflectionUtils.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.tez.dag.api.TezReflectionException; import org.apache.tez.dag.api.TezUncheckedException; @Private @@ -36,55 +37,44 @@ public class ReflectionUtils { private static final Map<String, Class<?>> CLAZZ_CACHE = new ConcurrentHashMap<String, Class<?>>(); @Private - public static Class<?> getClazz(String className) { + public static Class<?> getClazz(String className) throws TezReflectionException { Class<?> clazz = CLAZZ_CACHE.get(className); if (clazz == null) { try { clazz = Class.forName(className, true, Thread.currentThread().getContextClassLoader()); } catch (ClassNotFoundException e) { - throw new TezUncheckedException("Unable to load class: " + className, e); + throw new TezReflectionException("Unable to load class: " + className, e); } } return clazz; } - private static <T> T getNewInstance(Class<T> clazz) { + private static <T> T getNewInstance(Class<T> clazz) throws TezReflectionException { T instance; try { instance = clazz.newInstance(); - } catch (InstantiationException e) { - throw new TezUncheckedException( - "Unable to instantiate class with 0 arguments: " + clazz.getName(), e); - } catch (IllegalAccessException e) { - throw new TezUncheckedException( + } catch (Exception e) { + throw new TezReflectionException( "Unable to instantiate class with 0 arguments: " + clazz.getName(), e); } return instance; } - private static <T> T getNewInstance(Class<T> clazz, Class<?>[] parameterTypes, Object[] parameters) { + private static <T> T getNewInstance(Class<T> clazz, Class<?>[] parameterTypes, Object[] parameters) + throws TezReflectionException { T instance; try { Constructor<T> constructor = clazz.getConstructor(parameterTypes); instance = constructor.newInstance(parameters); - } catch (InstantiationException e) { - throw new TezUncheckedException( - "Unable to instantiate class with " + parameters.length + " arguments: " + clazz.getName(), e); - } catch (IllegalAccessException e) { - throw new TezUncheckedException( - "Unable to instantiate class with " + parameters.length + " arguments: " + clazz.getName(), e); - } catch (NoSuchMethodException e) { - throw new TezUncheckedException( - "Unable to instantiate class with " + parameters.length + " arguments: " + clazz.getName(), e); - } catch (InvocationTargetException e) { - throw new TezUncheckedException( + } catch (Exception e) { + throw new TezReflectionException( "Unable to instantiate class with " + parameters.length + " arguments: " + clazz.getName(), e); } return instance; } @Private - public static <T> T createClazzInstance(String className) { + public static <T> T createClazzInstance(String className) throws TezReflectionException { Class<?> clazz = getClazz(className); @SuppressWarnings("unchecked") T instance = (T) getNewInstance(clazz); @@ -92,7 +82,8 @@ public class ReflectionUtils { } @Private - public static <T> T createClazzInstance(String className, Class<?>[] parameterTypes, Object[] parameters) { + public static <T> T createClazzInstance(String className, Class<?>[] parameterTypes, Object[] parameters) + throws TezReflectionException { Class<?> clazz = getClazz(className); @SuppressWarnings("unchecked") T instance = (T) getNewInstance(clazz, parameterTypes, parameters); @@ -101,20 +92,20 @@ public class ReflectionUtils { @Private @SuppressWarnings("unchecked") - public static <T> T invokeMethod(Object target, Method method, Object... args) { + public static <T> T invokeMethod(Object target, Method method, Object... args) throws TezReflectionException { try { return (T) method.invoke(target, args); } catch (Exception e) { - throw new TezUncheckedException(e); + throw new TezReflectionException(e); } } @Private - public static Method getMethod(Class<?> targetClazz, String methodName, Class<?>... parameterTypes) { + public static Method getMethod(Class<?> targetClazz, String methodName, Class<?>... parameterTypes) throws TezReflectionException { try { return targetClazz.getMethod(methodName, parameterTypes); } catch (NoSuchMethodException e) { - throw new TezUncheckedException(e); + throw new TezReflectionException(e); } } http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-api/src/main/java/org/apache/tez/dag/api/TezReflectionException.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezReflectionException.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezReflectionException.java new file mode 100644 index 0000000..4d8d1e0 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezReflectionException.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.dag.api; + +public class TezReflectionException extends TezException { + + private static final long serialVersionUID = 7744789121243630729L; + + public TezReflectionException(String message) { + super(message); + } + + public TezReflectionException(String message, Throwable cause) { + super(message, cause); + } + + public TezReflectionException(Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-api/src/main/java/org/apache/tez/dag/api/client/TimelineReaderFactory.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/TimelineReaderFactory.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/TimelineReaderFactory.java index f544198..c0569dd 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/client/TimelineReaderFactory.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/TimelineReaderFactory.java @@ -46,7 +46,6 @@ import org.apache.hadoop.security.authentication.client.ConnectionConfigurator; import org.apache.hadoop.security.ssl.SSLFactory; import org.apache.tez.common.ReflectionUtils; import org.apache.tez.dag.api.TezException; -import org.apache.tez.dag.api.TezUncheckedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -167,7 +166,7 @@ public class TimelineReaderFactory { try { authenticator = getTokenAuthenticator(); authenticator.setConnectionConfigurator(connectionConfigurator); - } catch (TezUncheckedException e) { + } catch (TezException e) { throw new IOException("Failed to get authenticator", e); } @@ -179,13 +178,17 @@ public class TimelineReaderFactory { doAsUser = null; } - HttpURLConnectionFactory connectionFactory = - new TokenAuthenticatedURLConnectionFactory(connectionConfigurator, authenticator, - authUgi, doAsUser); + HttpURLConnectionFactory connectionFactory; + try { + connectionFactory = new TokenAuthenticatedURLConnectionFactory(connectionConfigurator, authenticator, + authUgi, doAsUser); + } catch (TezException e) { + throw new IOException("Fail to create TokenAuthenticatedURLConnectionFactory", e); + } return new Client(new URLConnectionClientHandler(connectionFactory), clientConfig); } - private static Authenticator getTokenAuthenticator() { + private static Authenticator getTokenAuthenticator() throws TezException { String authenticatorClazzName; if (UserGroupInformation.isSecurityEnabled()) { @@ -208,7 +211,7 @@ public class TimelineReaderFactory { public TokenAuthenticatedURLConnectionFactory(ConnectionConfigurator connConfigurator, Authenticator authenticator, UserGroupInformation authUgi, - String doAsUser) { + String doAsUser) throws TezException { this.connConfigurator = connConfigurator; this.authenticator = authenticator; this.authUgi = authUgi; @@ -377,7 +380,7 @@ public class TimelineReaderFactory { isTokenDelegationClassesPresent = true; - } catch (TezUncheckedException e) { + } catch (TezException e) { LOG.info("Could not find class required for token delegation, will fallback to pseudo auth"); } } http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-api/src/test/java/org/apache/tez/common/TestReflectionUtils.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/common/TestReflectionUtils.java b/tez-api/src/test/java/org/apache/tez/common/TestReflectionUtils.java index 253e3a7..2fbd35c 100644 --- a/tez-api/src/test/java/org/apache/tez/common/TestReflectionUtils.java +++ b/tez-api/src/test/java/org/apache/tez/common/TestReflectionUtils.java @@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.TezReflectionException; import org.junit.Test; public class TestReflectionUtils { @@ -45,7 +46,7 @@ public class TestReflectionUtils { } @Test(timeout = 5000) - public void testConstructorWithParameters() + public void testConstructorWithParameters() throws TezReflectionException { Class<?>[] parameterTypes = new Class[] { String.class, Integer.TYPE }; Object[] parameters = new Object[] { new String("test"), 1 }; http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 04c7b82..fee13c1 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -59,6 +59,7 @@ import java.util.regex.Pattern; import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; import com.google.common.collect.Lists; + import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.Options; @@ -1053,7 +1054,8 @@ public class DAGAppMaster extends AbstractService { protected TaskCommunicatorManagerInterface createTaskCommunicatorManager(AppContext context, TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh, - List<NamedEntityDescriptor> entityDescriptors) { + List<NamedEntityDescriptor> entityDescriptors) + throws TezException { TaskCommunicatorManagerInterface tcm = new TaskCommunicatorManager(context, thh, chh, entityDescriptors); return tcm; @@ -1079,7 +1081,7 @@ public class DAGAppMaster extends AbstractService { protected ContainerLauncherManager createContainerLauncherManager( List<NamedEntityDescriptor> containerLauncherDescriptors, boolean isLocal) throws - UnknownHostException { + UnknownHostException, TezException { return new ContainerLauncherManager(context, taskCommunicatorManager, workingDirectory, containerLauncherDescriptors, isLocal); } http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java index 42df259..cfb34ac 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java @@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentMap; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; + import org.apache.commons.collections4.ListUtils; import org.apache.tez.dag.api.NamedEntityDescriptor; import org.apache.tez.dag.api.TezConstants; @@ -103,7 +104,7 @@ public class TaskCommunicatorManager extends AbstractService implements public TaskCommunicatorManager(AppContext context, TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh, - List<NamedEntityDescriptor> taskCommunicatorDescriptors) { + List<NamedEntityDescriptor> taskCommunicatorDescriptors) throws TezException { super(TaskCommunicatorManager.class.getName()); this.context = context; this.taskHeartbeatHandler = thh; @@ -141,7 +142,7 @@ public class TaskCommunicatorManager extends AbstractService implements @VisibleForTesting TaskCommunicator createTaskCommunicator(NamedEntityDescriptor taskCommDescriptor, - int taskCommIndex) { + int taskCommIndex) throws TezException { if (taskCommDescriptor.getEntityName().equals(TezConstants.getTezYarnServicePluginName())) { return createDefaultTaskCommunicator(taskCommunicatorContexts[taskCommIndex]); } else if (taskCommDescriptor.getEntityName() @@ -167,7 +168,8 @@ public class TaskCommunicatorManager extends AbstractService implements @VisibleForTesting TaskCommunicator createCustomTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext, - NamedEntityDescriptor taskCommDescriptor) { + NamedEntityDescriptor taskCommDescriptor) + throws TezException { LOG.info("Using TaskCommunicator {}:{} " + taskCommDescriptor.getEntityName(), taskCommDescriptor.getClassName()); Class<? extends TaskCommunicator> taskCommClazz = http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java index 4ee00fa..4a8a286 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java @@ -48,6 +48,7 @@ import org.apache.tez.common.ReflectionUtils; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.InputInitializerDescriptor; import org.apache.tez.dag.api.RootInputLeafOutput; +import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.event.*; import org.apache.tez.dag.api.event.VertexState; @@ -106,7 +107,7 @@ public class RootInputInitializerManager { } public void runInputInitializers(List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> - inputs) { + inputs) throws TezException { for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input : inputs) { InputInitializerContext context = @@ -133,7 +134,7 @@ public class RootInputInitializerManager { @VisibleForTesting protected InputInitializer createInitializer(RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> - input, InputInitializerContext context) { + input, InputInitializerContext context) throws TezException { InputInitializer initializer = ReflectionUtils .createClazzInstance(input.getControllerDescriptor().getClassName(), new Class[]{InputInitializerContext.class}, new Object[]{context}); http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index 756ed28..da9c416 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -1467,7 +1467,16 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, } } - createDAGEdges(this); + try { + createDAGEdges(this); + } catch (TezException e2) { + String msg = "Fail to create edges, " + ExceptionUtils.getStackTrace(e2); + addDiagnostic(msg); + LOG.error(msg); + trySetTerminationCause(DAGTerminationCause.INIT_FAILURE); + finished(DAGState.FAILED); + return DAGState.FAILED; + } Map<String,EdgePlan> edgePlans = DagTypeConverters.createEdgePlanMapFromDAGPlan(getJobPlan().getEdgeList()); // setup the dag @@ -1489,7 +1498,17 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, } } - assignDAGScheduler(this); + try { + assignDAGScheduler(this); + } catch (TezException e1) { + String msg = "Fail to assign DAGScheduler for dag:" + dagName + " due to " + + ExceptionUtils.getStackTrace(e1); + LOG.error(msg); + addDiagnostic(msg); + trySetTerminationCause(DAGTerminationCause.INIT_FAILURE); + finished(DAGState.FAILED); + return DAGState.FAILED; + } for (Map.Entry<String, VertexGroupInfo> entry : vertexGroups.entrySet()) { String groupName = entry.getKey(); @@ -1510,7 +1529,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, return DAGState.INITED; } - private void createDAGEdges(DAGImpl dag) { + private void createDAGEdges(DAGImpl dag) throws TezException { for (EdgePlan edgePlan : dag.getJobPlan().getEdgeList()) { EdgeProperty edgeProperty = DagTypeConverters .createEdgePropertyMapFromDAGPlan(edgePlan); @@ -1521,7 +1540,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, } } - private static void assignDAGScheduler(DAGImpl dag) { + private static void assignDAGScheduler(DAGImpl dag) throws TezException { String dagSchedulerClassName = dag.dagConf.get(TezConfiguration.TEZ_AM_DAG_SCHEDULER_CLASS, TezConfiguration.TEZ_AM_DAG_SCHEDULER_CLASS_DEFAULT); LOG.info("Using DAG Scheduler: " + dagSchedulerClassName); http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java index da74a46..0be7790 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java @@ -35,6 +35,7 @@ import org.apache.tez.dag.api.EdgeManagerPluginContext; import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; import org.apache.tez.dag.api.EdgeManagerPluginOnDemand; import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata; @@ -119,14 +120,14 @@ public class Edge { .newConcurrentMap(); @SuppressWarnings("rawtypes") - public Edge(EdgeProperty edgeProperty, EventHandler eventHandler, Configuration conf) { + public Edge(EdgeProperty edgeProperty, EventHandler eventHandler, Configuration conf) throws TezException { this.edgeProperty = edgeProperty; this.eventHandler = eventHandler; this.conf = conf; createEdgeManager(); } - private void createEdgeManager() { + private void createEdgeManager() throws TezException { switch (edgeProperty.getDataMovementType()) { case ONE_TO_ONE: edgeManagerContext = new EdgeManagerPluginContextImpl(null); @@ -160,7 +161,7 @@ public class Edge { default: String message = "Unknown edge data movement type: " + edgeProperty.getDataMovementType(); - throw new TezUncheckedException(message); + throw new TezException(message); } } @@ -182,7 +183,11 @@ public class Edge { public synchronized void setEdgeProperty(EdgeProperty newEdgeProperty) throws AMUserCodeException { this.edgeProperty = newEdgeProperty; boolean wasUnInitialized = (edgeManager == null); - createEdgeManager(); + try { + createEdgeManager(); + } catch (TezException e) { + throw new AMUserCodeException(Source.EdgeManager, e); + } initialize(); if (wasUnInitialized) { sendEvent(new VertexEventNullEdgeInitialized(sourceVertex.getVertexId(), this, http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index f4dd7dc..3dae42b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -75,6 +75,7 @@ import org.apache.tez.dag.api.RootInputLeafOutput; import org.apache.tez.dag.api.Scope; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.VertexLocationHint; import org.apache.tez.dag.api.TaskLocationHint; @@ -2469,7 +2470,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl } } - assignVertexManager(); + try { + assignVertexManager(); + } catch (TezException e1) { + String msg = "Fail to create VertexManager, " + ExceptionUtils.getStackTrace(e1); + LOG.error(msg); + return finished(VertexState.FAILED, VertexTerminationCause.INIT_FAILURE, msg); + } try { vertexManager.initialize(); @@ -2512,7 +2519,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl return VertexState.INITED; } - private void assignVertexManager() { + private void assignVertexManager() throws TezException { boolean hasBipartite = false; boolean hasOneToOne = false; boolean hasCustom = false; @@ -3359,7 +3366,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl if (vertex.inputsWithInitializers != null) { LOG.info("Vertex will initialize from input initializer. " + vertex.logIdentifier); - vertex.setupInputInitializerManager(); + try { + vertex.setupInputInitializerManager(); + } catch (TezException e) { + String msg = "Fail to create InputInitializerManager, " + ExceptionUtils.getStackTrace(e); + LOG.info(msg); + return vertex.finished(VertexState.FAILED, VertexTerminationCause.INIT_FAILURE, msg); + } return VertexState.INITIALIZING; } else { boolean hasOneToOneUninitedSource = false; @@ -3390,7 +3403,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl // this block may return VertexState.INITIALIZING if (vertex.inputsWithInitializers != null) { LOG.info("Vertex will initialize from input initializer. " + vertex.logIdentifier); - vertex.setupInputInitializerManager(); + try { + vertex.setupInputInitializerManager(); + } catch (TezException e) { + String msg = "Fail to create InputInitializerManager, " + ExceptionUtils.getStackTrace(e); + LOG.error(msg); + return vertex.finished(VertexState.FAILED, VertexTerminationCause.INIT_FAILURE, msg); + } return VertexState.INITIALIZING; } if (!vertex.uninitializedEdges.isEmpty()) { @@ -4560,7 +4579,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl } } - private void setupInputInitializerManager() { + private void setupInputInitializerManager() throws TezException { rootInputInitializerManager = createRootInputInitializerManager( getDAG().getName(), getName(), getVertexId(), eventHandler, getTotalTasks(), http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java index bb512a9..32f7a42 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java @@ -43,6 +43,7 @@ import org.apache.tez.dag.api.EdgeProperty; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.InputInitializerDescriptor; import org.apache.tez.dag.api.RootInputLeafOutput; +import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.VertexLocationHint; @@ -368,7 +369,7 @@ public class VertexManager { } public VertexManager(VertexManagerPluginDescriptor pluginDesc, UserGroupInformation dagUgi, - Vertex managedVertex, AppContext appContext, StateChangeNotifier stateChangeNotifier) { + Vertex managedVertex, AppContext appContext, StateChangeNotifier stateChangeNotifier) throws TezException { checkNotNull(pluginDesc, "pluginDesc is null"); checkNotNull(managedVertex, "managedVertex is null"); checkNotNull(appContext, "appContext is null"); http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java index 15a10bd..9e56f44 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java @@ -14,19 +14,19 @@ package org.apache.tez.dag.app.launcher; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; import java.net.UnknownHostException; import java.util.List; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tez.common.ReflectionUtils; import org.apache.tez.dag.api.NamedEntityDescriptor; import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService; import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; @@ -70,7 +70,7 @@ public class ContainerLauncherManager extends AbstractService TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, String workingDirectory, List<NamedEntityDescriptor> containerLauncherDescriptors, - boolean isPureLocalMode) { + boolean isPureLocalMode) throws TezException { super(ContainerLauncherManager.class.getName()); this.appContext = context; @@ -101,7 +101,7 @@ public class ContainerLauncherManager extends AbstractService TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, String workingDirectory, int containerLauncherIndex, - boolean isPureLocalMode) { + boolean isPureLocalMode) throws TezException { if (containerLauncherDescriptor.getEntityName().equals( TezConstants.getTezYarnServicePluginName())) { return createYarnContainerLauncher(containerLauncherContext); @@ -144,20 +144,13 @@ public class ContainerLauncherManager extends AbstractService @VisibleForTesting @SuppressWarnings("unchecked") ContainerLauncher createCustomContainerLauncher(ContainerLauncherContext containerLauncherContext, - NamedEntityDescriptor containerLauncherDescriptor) { + NamedEntityDescriptor containerLauncherDescriptor) + throws TezException { LOG.info("Creating container launcher {}:{} ", containerLauncherDescriptor.getEntityName(), containerLauncherDescriptor.getClassName()); - Class<? extends ContainerLauncher> containerLauncherClazz = - (Class<? extends ContainerLauncher>) ReflectionUtils.getClazz( - containerLauncherDescriptor.getClassName()); - try { - Constructor<? extends ContainerLauncher> ctor = containerLauncherClazz - .getConstructor(ContainerLauncherContext.class); - return ctor.newInstance(containerLauncherContext); - } catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) { - throw new TezUncheckedException(e); - } - + return ReflectionUtils.createClazzInstance(containerLauncherDescriptor.getClassName(), + new Class[]{ContainerLauncherContext.class}, + new Object[]{containerLauncherContext}); } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java index 29143a2..04d7089 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java @@ -18,8 +18,6 @@ package org.apache.tez.dag.app.rm; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.List; @@ -33,6 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; + import org.apache.tez.dag.api.NamedEntityDescriptor; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService; @@ -56,6 +55,7 @@ import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tez.common.ReflectionUtils; import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.TaskLocationHint; import org.apache.tez.dag.api.TaskLocationHint.TaskBasedLocationAffinity; @@ -392,7 +392,7 @@ public class TaskSchedulerManager extends AbstractService implements AppContext appContext, NamedEntityDescriptor taskSchedulerDescriptor, long customAppIdIdentifier, - int schedulerId) { + int schedulerId) throws TezException { TaskSchedulerContext rawContext = new TaskSchedulerContextImpl(this, appContext, schedulerId, trackingUrl, customAppIdIdentifier, host, port, taskSchedulerDescriptor.getUserPayload()); @@ -429,24 +429,17 @@ public class TaskSchedulerManager extends AbstractService implements @SuppressWarnings("unchecked") TaskScheduler createCustomTaskScheduler(TaskSchedulerContext taskSchedulerContext, NamedEntityDescriptor taskSchedulerDescriptor, - int schedulerId) { + int schedulerId) throws TezException { LOG.info("Creating custom TaskScheduler {}:{}", taskSchedulerDescriptor.getEntityName(), taskSchedulerDescriptor.getClassName()); - Class<? extends TaskScheduler> taskSchedulerClazz = - (Class<? extends TaskScheduler>) ReflectionUtils - .getClazz(taskSchedulerDescriptor.getClassName()); - try { - Constructor<? extends TaskScheduler> ctor = taskSchedulerClazz - .getConstructor(TaskSchedulerContext.class); - return ctor.newInstance(taskSchedulerContext); - } catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) { - throw new TezUncheckedException(e); - } + return ReflectionUtils.createClazzInstance(taskSchedulerDescriptor.getClassName(), + new Class[]{TaskSchedulerContext.class}, + new Object[]{taskSchedulerContext}); } @VisibleForTesting protected void instantiateSchedulers(String host, int port, String trackingUrl, - AppContext appContext) { + AppContext appContext) throws TezException { // Iterate over the list and create all the taskSchedulers int j = 0; for (int i = 0; i < taskSchedulerDescriptors.length; i++) { @@ -467,7 +460,7 @@ public class TaskSchedulerManager extends AbstractService implements @Override - public synchronized void serviceStart() { + public synchronized void serviceStart() throws Exception { InetSocketAddress serviceAddr = clientService.getBindAddress(); dagAppMaster = appContext.getAppMaster(); // if web service is enabled then set tracking url. else disable it (value = ""). http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java index be7adde..1cd8bb1 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java @@ -47,6 +47,7 @@ import org.apache.tez.dag.api.NamedEntityDescriptor; import org.apache.tez.dag.api.TaskCommunicator; import org.apache.tez.dag.api.TaskCommunicatorContext; import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.event.VertexStateUpdate; import org.apache.tez.dag.records.TezTaskAttemptID; @@ -66,7 +67,7 @@ public class TestTaskCommunicatorManager { } @Test(timeout = 5000) - public void testNoTaskCommSpecified() throws IOException { + public void testNoTaskCommSpecified() throws IOException, TezException { AppContext appContext = mock(AppContext.class); TaskHeartbeatHandler thh = mock(TaskHeartbeatHandler.class); @@ -83,7 +84,7 @@ public class TestTaskCommunicatorManager { } @Test(timeout = 5000) - public void testCustomTaskCommSpecified() throws IOException { + public void testCustomTaskCommSpecified() throws IOException, TezException { AppContext appContext = mock(AppContext.class); TaskHeartbeatHandler thh = mock(TaskHeartbeatHandler.class); @@ -118,7 +119,7 @@ public class TestTaskCommunicatorManager { } @Test(timeout = 5000) - public void testMultipleTaskComms() throws IOException { + public void testMultipleTaskComms() throws IOException, TezException { AppContext appContext = mock(AppContext.class); TaskHeartbeatHandler thh = mock(TaskHeartbeatHandler.class); @@ -250,13 +251,13 @@ public class TestTaskCommunicatorManager { public TaskCommManagerForMultipleCommTest(AppContext context, TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh, - List<NamedEntityDescriptor> taskCommunicatorDescriptors) { + List<NamedEntityDescriptor> taskCommunicatorDescriptors) throws TezException { super(context, thh, chh, taskCommunicatorDescriptors); } @Override TaskCommunicator createTaskCommunicator(NamedEntityDescriptor taskCommDescriptor, - int taskCommIndex) { + int taskCommIndex) throws TezException { numTaskComms.incrementAndGet(); boolean added = taskCommIndices.add(taskCommIndex); assertTrue("Cannot add multiple taskComms with the same index", added); @@ -283,7 +284,7 @@ public class TestTaskCommunicatorManager { @Override TaskCommunicator createCustomTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext, - NamedEntityDescriptor taskCommDescriptor) { + NamedEntityDescriptor taskCommDescriptor) throws TezException { taskCommContexts.add(taskCommunicatorContext); TaskCommunicator spyComm = spy(super.createCustomTaskCommunicator(taskCommunicatorContext, taskCommDescriptor)); http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java index e8ce429..117d3b3 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java @@ -107,7 +107,7 @@ public class TestTaskCommunicatorManager1 { TezTaskAttemptID taskAttemptID; @Before - public void setUp() { + public void setUp() throws TezException { appId = ApplicationId.newInstance(1000, 1); appAttemptId = ApplicationAttemptId.newInstance(appId, 1); dag = mock(DAG.class); @@ -304,7 +304,7 @@ public class TestTaskCommunicatorManager1 { // TODO TEZ-2003 Move this into TestTezTaskCommunicator. Potentially other tests as well. @Test (timeout= 5000) - public void testPortRange_NotSpecified() throws IOException { + public void testPortRange_NotSpecified() throws IOException, TezException { Configuration conf = new Configuration(); JobTokenIdentifier identifier = new JobTokenIdentifier(new Text( "fakeIdentifier")); @@ -396,7 +396,7 @@ public class TestTaskCommunicatorManager1 { public TaskCommunicatorManagerInterfaceImplForTest(AppContext context, TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh, - List<NamedEntityDescriptor> taskCommDescriptors) { + List<NamedEntityDescriptor> taskCommDescriptors) throws TezException { super(context, thh, chh, taskCommDescriptors); } http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java index d75b0e5..a7652a0 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.Map; import com.google.common.collect.Lists; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; @@ -41,6 +42,7 @@ import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.NamedEntityDescriptor; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.dag.app.dag.DAG; @@ -58,7 +60,7 @@ import org.mockito.ArgumentCaptor; public class TestTaskCommunicatorManager2 { @Test(timeout = 5000) - public void testTaskAttemptFailedKilled() throws IOException { + public void testTaskAttemptFailedKilled() throws IOException, TezException { ApplicationId appId = ApplicationId.newInstance(1000, 1); ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1); Credentials credentials = new Credentials(); http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java index ac4f61b..676ae33 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java @@ -744,6 +744,76 @@ public class TestDAGImpl { return dag; } + // v1 -> v2 + private DAGPlan createDAGWithNonExistEdgeManager() { + LOG.info("Setting up dag plan with non-exist edgemanager"); + DAGPlan dag = DAGPlan.newBuilder() + .setName("testverteximpl") + .addVertex( + VertexPlan.newBuilder() + .setName("vertex1") + .setType(PlanVertexType.NORMAL) + .addTaskLocationHint( + PlanTaskLocationHint.newBuilder() + .addHost("host1") + .addRack("rack1") + .build() + ) + .setTaskConfig( + PlanTaskConfiguration.newBuilder() + .setNumTasks(1) + .setVirtualCores(4) + .setMemoryMb(1024) + .setJavaOpts("") + .setTaskModule("x1.y1") + .build() + ) + .addOutEdgeId("e1") + .build() + ) + .addVertex( + VertexPlan.newBuilder() + .setName("vertex2") + .setType(PlanVertexType.NORMAL) + .setProcessorDescriptor( + TezEntityDescriptorProto.newBuilder().setClassName("x2.y2")) + .addTaskLocationHint( + PlanTaskLocationHint.newBuilder() + .addHost("host2") + .addRack("rack2") + .build() + ) + .setTaskConfig( + PlanTaskConfiguration.newBuilder() + .setNumTasks(2) + .setVirtualCores(4) + .setMemoryMb(1024) + .setJavaOpts("foo") + .setTaskModule("x2.y2") + .build() + ) + .addInEdgeId("e1") + .build() + ) + .addEdge( + EdgePlan.newBuilder() + .setEdgeManager(TezEntityDescriptorProto.newBuilder() + .setClassName("non-exist-edge-manager") + ) + .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v1_v2")) + .setInputVertexName("vertex1") + .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o1")) + .setOutputVertexName("vertex2") + .setDataMovementType(PlanEdgeDataMovementType.CUSTOM) + .setId("e1") + .setDataSourceType(PlanEdgeDataSourceType.PERSISTED) + .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL) + .build() + ) + .build(); + return dag; + } + @BeforeClass public static void beforeClass() { MockDNSToSwitchMapping.initializeMockRackResolver(); @@ -969,6 +1039,39 @@ public class TestDAGImpl { } } + @Test(timeout = 5000) + public void testNonExistEdgeManagerPlugin() { + dagPlan = createDAGWithNonExistEdgeManager(); + dag = new DAGImpl(dagId, conf, dagPlan, + dispatcher.getEventHandler(), taskCommunicatorManagerInterface, + fsTokens, clock, "user", thh, appContext); + dag.entityUpdateTracker = new StateChangeNotifierForTest(dag); + doReturn(dag).when(appContext).getCurrentDAG(); + + dag.handle(new DAGEvent(dagId, DAGEventType.DAG_INIT)); + Assert.assertEquals(DAGState.FAILED, dag.getState()); + Assert.assertEquals(DAGTerminationCause.INIT_FAILURE, dag.getTerminationCause()); + Assert.assertTrue(StringUtils.join(dag.getDiagnostics(), "") + .contains("java.lang.ClassNotFoundException: non-exist-edge-manager")); + } + + @Test (timeout = 5000) + public void testNonExistDAGScheduler() { + conf.set(TezConfiguration.TEZ_AM_DAG_SCHEDULER_CLASS, "non-exist-dag-scheduler"); + dag = new DAGImpl(dagId, conf, dagPlan, + dispatcher.getEventHandler(), taskCommunicatorManagerInterface, + fsTokens, clock, "user", thh, appContext); + dag.entityUpdateTracker = new StateChangeNotifierForTest(dag); + doReturn(dag).when(appContext).getCurrentDAG(); + + dag.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_INIT)); + Assert.assertEquals(DAGState.FAILED, dag.getState()); + Assert.assertEquals(DAGState.FAILED, dag.getState()); + Assert.assertEquals(DAGTerminationCause.INIT_FAILURE, dag.getTerminationCause()); + Assert.assertTrue(StringUtils.join(dag.getDiagnostics(), "") + .contains("java.lang.ClassNotFoundException: non-exist-dag-scheduler")); + } + @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testVertexCompletion() { http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java index eb03d1e..f53e505 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestEdge.java @@ -54,6 +54,7 @@ import org.apache.tez.dag.api.EdgeProperty.SchedulingType; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.app.dag.Task; import org.apache.tez.dag.app.dag.Vertex; @@ -158,7 +159,7 @@ public class TestEdge { @SuppressWarnings({ "rawtypes" }) @Test (timeout = 5000) - public void testCompositeEventHandling() throws AMUserCodeException { + public void testCompositeEventHandling() throws TezException { EventHandler eventHandler = mock(EventHandler.class); EdgeProperty edgeProp = EdgeProperty.create(DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, mock(OutputDescriptor.class), http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index eb68a6f..a54c56a 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -87,6 +87,7 @@ import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.RootInputLeafOutput; import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.VertexLocationHint; @@ -575,6 +576,100 @@ public class TestVertexImpl { return dag; } + private DAGPlan createDAGPlanWithNonExistInputInitializer() { + LOG.info("Setting up dag plan with non exist inputinitializer"); + DAGPlan dag = DAGPlan.newBuilder() + .setName("initializerWith0Tasks") + .addVertex( + VertexPlan.newBuilder() + .setName("vertex1") + .setType(PlanVertexType.NORMAL) + .addInputs( + RootInputLeafOutputProto.newBuilder() + .setControllerDescriptor( + TezEntityDescriptorProto.newBuilder().setClassName( + "non-exist-input-initializer")) + .setName("input1") + .setIODescriptor( + TezEntityDescriptorProto.newBuilder() + .setClassName("InputClazz") + .build() + ) + .build() + ) + .setTaskConfig( + PlanTaskConfiguration.newBuilder() + .setNumTasks(1) + .setVirtualCores(4) + .setMemoryMb(1024) + .setJavaOpts("") + .setTaskModule("x1.y1") + .build() + ) + .build() + ).build(); + return dag; + } + + private DAGPlan createDAGPlanWithNonExistOutputCommitter() { + LOG.info("Setting up dag plan with non exist output committer"); + DAGPlan dag = DAGPlan.newBuilder() + .setName("initializerWith0Tasks") + .addVertex( + VertexPlan.newBuilder() + .setName("vertex1") + .setType(PlanVertexType.NORMAL) + .addOutputs( + RootInputLeafOutputProto.newBuilder() + .setControllerDescriptor( + TezEntityDescriptorProto.newBuilder().setClassName( + "non-exist-output-committer")) + .setName("output1") + .setIODescriptor( + TezEntityDescriptorProto.newBuilder() + .setClassName("OutputClazz") + .build() + ) + .build() + ) + .setTaskConfig( + PlanTaskConfiguration.newBuilder() + .setNumTasks(1) + .setVirtualCores(4) + .setMemoryMb(1024) + .setJavaOpts("") + .setTaskModule("x1.y1") + .build() + ) + .build() + ).build(); + return dag; + } + + private DAGPlan createDAGPlanWithNonExistVertexManager() { + LOG.info("Setting up dag plan with non-exist VertexManager"); + DAGPlan dag = DAGPlan.newBuilder() + .setName("initializerWith0Tasks") + .addVertex( + VertexPlan.newBuilder() + .setName("vertex1") + .setType(PlanVertexType.NORMAL) + .setTaskConfig( + PlanTaskConfiguration.newBuilder() + .setNumTasks(1) + .setVirtualCores(4) + .setMemoryMb(1024) + .setJavaOpts("") + .setTaskModule("x1.y1") + .build() + ) + .setVertexManagerPlugin(TezEntityDescriptorProto.newBuilder() + .setClassName("non-exist-vertexmanager")) + .build() + ).build(); + return dag; + } + private DAGPlan createDAGPlanWithMixedEdges() { LOG.info("Setting up mixed edge dag plan"); org.apache.tez.dag.api.DAG dag = org.apache.tez.dag.api.DAG.create("MixedEdges"); @@ -2151,7 +2246,7 @@ public class TestVertexImpl { } @SuppressWarnings({ "unchecked", "rawtypes" }) - public void setupPostDagCreation() throws AMUserCodeException { + public void setupPostDagCreation() throws TezException { String dagName = "dag0"; // dispatcher may be created multiple times (setupPostDagCreation may be called multiples) if (dispatcher != null) { @@ -2266,7 +2361,7 @@ public class TestVertexImpl { } @Before - public void setup() throws AMUserCodeException { + public void setup() throws TezException { useCustomInitializer = false; customInitializer = null; setupPreDagCreation(); @@ -2385,6 +2480,45 @@ public class TestVertexImpl { .getOutputDescriptor().getClassName())); } + @Test(timeout=5000) + public void testNonExistVertexManager() throws TezException { + setupPreDagCreation(); + dagPlan = createDAGPlanWithNonExistVertexManager(); + setupPostDagCreation(); + VertexImpl v1 = vertices.get("vertex1"); + v1.handle(new VertexEvent(v1.getVertexId(), VertexEventType.V_INIT)); + Assert.assertEquals(VertexState.FAILED, v1.getState()); + Assert.assertEquals(VertexTerminationCause.INIT_FAILURE, v1.getTerminationCause()); + Assert.assertTrue(StringUtils.join(v1.getDiagnostics(),"") + .contains("java.lang.ClassNotFoundException: non-exist-vertexmanager")); + } + + @Test(timeout=5000) + public void testNonExistInputInitializer() throws TezException { + setupPreDagCreation(); + dagPlan = createDAGPlanWithNonExistInputInitializer(); + setupPostDagCreation(); + VertexImpl v1 = vertices.get("vertex1"); + v1.handle(new VertexEvent(v1.getVertexId(), VertexEventType.V_INIT)); + Assert.assertEquals(VertexState.FAILED, v1.getState()); + Assert.assertEquals(VertexTerminationCause.INIT_FAILURE, v1.getTerminationCause()); + Assert.assertTrue(StringUtils.join(v1.getDiagnostics(),"") + .contains("java.lang.ClassNotFoundException: non-exist-input-initializer")); + } + + @Test(timeout=5000) + public void testNonExistOutputCommitter() throws TezException { + setupPreDagCreation(); + dagPlan = createDAGPlanWithNonExistOutputCommitter(); + setupPostDagCreation(); + VertexImpl v1 = vertices.get("vertex1"); + v1.handle(new VertexEvent(v1.getVertexId(), VertexEventType.V_INIT)); + Assert.assertEquals(VertexState.FAILED, v1.getState()); + Assert.assertEquals(VertexTerminationCause.INIT_FAILURE, v1.getTerminationCause()); + Assert.assertTrue(StringUtils.join(v1.getDiagnostics(),"") + .contains("java.lang.ClassNotFoundException: non-exist-output-committer")); + } + class TestUpdateListener implements VertexStateUpdateListener { List<VertexStateUpdate> events = Lists.newLinkedList(); @Override @@ -3734,7 +3868,7 @@ public class TestVertexImpl { } @Test(timeout = 5000) - public void testVertexManagerHeuristic() throws AMUserCodeException { + public void testVertexManagerHeuristic() throws TezException { setupPreDagCreation(); dagPlan = createDAGPlanWithMixedEdges(); setupPostDagCreation(); @@ -3991,7 +4125,7 @@ public class TestVertexImpl { } @Test(timeout = 10000) - public void testVertexWithInitializerParallelismSetTo0() throws InterruptedException, AMUserCodeException { + public void testVertexWithInitializerParallelismSetTo0() throws InterruptedException, TezException { useCustomInitializer = true; customInitializer = new RootInitializerSettingParallelismTo0(null); RootInitializerSettingParallelismTo0 initializer = @@ -4533,7 +4667,7 @@ public class TestVertexImpl { * If broadcast, one-to-one or custom edges are present in source, tasks should not start until * 1 task from each source vertex is complete. */ - public void testTaskSchedulingWithCustomEdges() throws AMUserCodeException { + public void testTaskSchedulingWithCustomEdges() throws TezException { setupPreDagCreation(); dagPlan = createCustomDAGWithCustomEdges(); setupPostDagCreation(); @@ -5359,7 +5493,7 @@ public class TestVertexImpl { hasShutDown = true; } - public void failInputInitialization() { + public void failInputInitialization() throws TezException { super.runInputInitializers(inputs); eventHandler.handle(new VertexEventRootInputFailed(vertexID, inputs .get(0).getName(), @@ -5408,7 +5542,7 @@ public class TestVertexImpl { } @Test(timeout=5000) - public void testVertexGroupInput() throws AMUserCodeException { + public void testVertexGroupInput() throws TezException { setupPreDagCreation(); dagPlan = createVertexGroupDAGPlan(); setupPostDagCreation(); @@ -5558,7 +5692,7 @@ public class TestVertexImpl { } @Test(timeout = 5000) - public void testInitStartRace() throws AMUserCodeException { + public void testInitStartRace() throws TezException { // Race when a source vertex manages to start before the target vertex has // been initialized setupPreDagCreation(); @@ -5581,7 +5715,7 @@ public class TestVertexImpl { } @Test(timeout = 5000) - public void testInitStartRace2() throws AMUserCodeException { + public void testInitStartRace2() throws TezException { // Race when a source vertex manages to start before the target vertex has // been initialized setupPreDagCreation(); @@ -5608,7 +5742,7 @@ public class TestVertexImpl { } @Test(timeout = 5000) - public void testTez2684() throws AMUserCodeException, IOException { + public void testTez2684() throws IOException, TezException { setupPreDagCreation(); dagPlan = createSamplerDAGPlan2(); setupPostDagCreation(); @@ -5677,7 +5811,7 @@ public class TestVertexImpl { } @Test(timeout = 5000) - public void testExceptionFromVM_Initialize() throws AMUserCodeException { + public void testExceptionFromVM_Initialize() throws TezException { useCustomInitializer = true; setupPreDagCreation(); dagPlan = createDAGPlanWithVMException("TestInputInitializer", VMExceptionLocation.Initialize); @@ -5837,7 +5971,7 @@ public class TestVertexImpl { } @Test(timeout = 5000) - public void testExceptionFromII_Initialize() throws AMUserCodeException, InterruptedException { + public void testExceptionFromII_Initialize() throws InterruptedException, TezException { useCustomInitializer = true; customInitializer = new EventHandlingRootInputInitializer(null, IIExceptionLocation.Initialize); EventHandlingRootInputInitializer initializer = @@ -5960,7 +6094,7 @@ public class TestVertexImpl { } @Test(timeout = 5000) - public void testExceptionFromII_OnVertexStateUpdated() throws AMUserCodeException, InterruptedException { + public void testExceptionFromII_OnVertexStateUpdated() throws InterruptedException, TezException { useCustomInitializer = true; customInitializer = new EventHandlingRootInputInitializer(null, IIExceptionLocation.OnVertexStateUpdated); EventHandlingRootInputInitializer initializer = @@ -5989,7 +6123,7 @@ public class TestVertexImpl { } @Test(timeout = 5000) - public void testExceptionFromII_InitSucceededAfterInitFailure() throws AMUserCodeException, InterruptedException { + public void testExceptionFromII_InitSucceededAfterInitFailure() throws InterruptedException, TezException { useCustomInitializer = true; customInitializer = new EventHandlingRootInputInitializer(null, IIExceptionLocation.OnVertexStateUpdated); EventHandlingRootInputInitializer initializer = http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java index 4b931d4..a8af808 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java @@ -39,6 +39,8 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.NamedEntityDescriptor; import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.TezReflectionException; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.TaskCommunicatorManagerInterface; @@ -61,7 +63,7 @@ public class TestContainerLauncherManager { } @Test(timeout = 5000) - public void testNoLaunchersSpecified() throws IOException { + public void testNoLaunchersSpecified() throws IOException, TezException { AppContext appContext = mock(AppContext.class); TaskCommunicatorManagerInterface tal = mock(TaskCommunicatorManagerInterface.class); @@ -77,7 +79,7 @@ public class TestContainerLauncherManager { } @Test(timeout = 5000) - public void testCustomLauncherSpecified() throws IOException { + public void testCustomLauncherSpecified() throws IOException, TezException { Configuration conf = new Configuration(false); AppContext appContext = mock(AppContext.class); @@ -111,7 +113,7 @@ public class TestContainerLauncherManager { } @Test(timeout = 5000) - public void testMultipleContainerLaunchers() throws IOException { + public void testMultipleContainerLaunchers() throws IOException, TezException { Configuration conf = new Configuration(false); conf.set("testkey", "testvalue"); UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf); @@ -261,7 +263,7 @@ public class TestContainerLauncherManager { String workingDirectory, List<NamedEntityDescriptor> containerLauncherDescriptors, boolean isPureLocalMode) throws - UnknownHostException { + UnknownHostException, TezException { super(context, taskCommunicatorManagerInterface, workingDirectory, containerLauncherDescriptors, isPureLocalMode); } @@ -273,7 +275,7 @@ public class TestContainerLauncherManager { TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, String workingDirectory, int containerLauncherIndex, - boolean isPureLocalMode) { + boolean isPureLocalMode) throws TezException { numContainerLaunchers.incrementAndGet(); boolean added = containerLauncherIndices.add(containerLauncherIndex); assertTrue("Cannot add multiple launchers with the same index", added); @@ -306,7 +308,7 @@ public class TestContainerLauncherManager { @Override ContainerLauncher createCustomContainerLauncher( ContainerLauncherContext containerLauncherContext, - NamedEntityDescriptor containerLauncherDescriptor) { + NamedEntityDescriptor containerLauncherDescriptor) throws TezException { ContainerLauncher spyLauncher = spy(super.createCustomContainerLauncher( containerLauncherContext, containerLauncherDescriptor)); testContainerLaunchers.add(spyLauncher); @@ -338,7 +340,7 @@ public class TestContainerLauncherManager { } } - private static class FakeContainerLauncher extends ContainerLauncher { + public static class FakeContainerLauncher extends ContainerLauncher { public FakeContainerLauncher( ContainerLauncherContext containerLauncherContext) { http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java index 98b7baa..4db51b9 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java @@ -63,6 +63,7 @@ import org.apache.tez.dag.api.NamedEntityDescriptor; import org.apache.tez.dag.api.TaskLocationHint; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.client.DAGClientServer; import org.apache.tez.dag.app.AppContext; @@ -532,7 +533,7 @@ public class TestTaskSchedulerManager { eq(launchRequest2)); } - private static class TSEHForMultipleSchedulersTest extends TaskSchedulerManager { + public static class TSEHForMultipleSchedulersTest extends TaskSchedulerManager { private final TaskScheduler yarnTaskScheduler; private final TaskScheduler uberTaskScheduler; @@ -562,7 +563,7 @@ public class TestTaskSchedulerManager { AppContext appContext, NamedEntityDescriptor taskSchedulerDescriptor, long customAppIdIdentifier, - int schedulerId) { + int schedulerId) throws TezException { numCreateInvocations.incrementAndGet(); boolean added = seenSchedulers.add(schedulerId); @@ -596,7 +597,8 @@ public class TestTaskSchedulerManager { @Override TaskScheduler createCustomTaskScheduler(TaskSchedulerContext taskSchedulerContext, - NamedEntityDescriptor taskSchedulerDescriptor, int schedulerId) { + NamedEntityDescriptor taskSchedulerDescriptor, int schedulerId) + throws TezException { taskSchedulerContexts.add(taskSchedulerContext); TaskScheduler taskScheduler = spy(super.createCustomTaskScheduler(taskSchedulerContext, taskSchedulerDescriptor, schedulerId)); testTaskSchedulers.add(taskScheduler); http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java index 3507d99..b215a06 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java @@ -39,6 +39,7 @@ import org.apache.tez.dag.api.EdgeProperty.DataSourceType; import org.apache.tez.dag.api.EdgeProperty.SchedulingType; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.OutputDescriptor; +import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.VertexLocationHint; import org.apache.tez.dag.api.TaskLocationHint; @@ -83,7 +84,7 @@ public class TestHistoryEventsProtoConversion { TestHistoryEventsProtoConversion.class); - private HistoryEvent testProtoConversion(HistoryEvent event) throws IOException { + private HistoryEvent testProtoConversion(HistoryEvent event) throws IOException, TezException { ByteArrayOutputStream os = new ByteArrayOutputStream(); HistoryEvent deserializedEvent = null; event.toProtoStream(os); @@ -100,7 +101,7 @@ public class TestHistoryEventsProtoConversion { } private HistoryEvent testSummaryProtoConversion(HistoryEvent historyEvent) - throws IOException { + throws IOException, TezException { SummaryEvent event = (SummaryEvent) historyEvent; ByteArrayOutputStream os = new ByteArrayOutputStream(); HistoryEvent deserializedEvent = null; @@ -757,7 +758,7 @@ public class TestHistoryEventsProtoConversion { } } - private void testDAGRecoveredEvent() { + private void testDAGRecoveredEvent() throws TezException { DAGRecoveredEvent dagRecoveredEvent = new DAGRecoveredEvent( ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1), TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplit.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplit.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplit.java index 0c1c327..4f3a0f2 100644 --- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplit.java +++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplit.java @@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.TezUncheckedException; /** @@ -106,10 +107,14 @@ public class TezGroupedSplit implements InputSplit, Configurable { public void readFields(DataInput in) throws IOException { wrappedInputFormatName = Text.readString(in); String inputSplitClassName = Text.readString(in); - Class<? extends InputSplit> clazz = - (Class<? extends InputSplit>) - TezGroupedSplitsInputFormat.getClassFromName(inputSplitClassName); - + Class<? extends InputSplit> clazz = null; + try { + clazz = (Class<? extends InputSplit>) + TezGroupedSplitsInputFormat.getClassFromName(inputSplitClassName); + } catch (TezException e) { + throw new IOException(e); + } + int numSplits = in.readInt(); wrappedSplits = new ArrayList<InputSplit>(numSplits); http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java index 707f9ad..b361aec 100644 --- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java +++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/split/TezGroupedSplitsInputFormat.java @@ -33,7 +33,7 @@ import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.split.SplitSizeEstimator; import org.apache.tez.common.ReflectionUtils; -import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.api.TezException; import com.google.common.base.Preconditions; @@ -93,24 +93,28 @@ public class TezGroupedSplitsInputFormat<K, V> public RecordReader<K, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { TezGroupedSplit groupedSplit = (TezGroupedSplit) split; - initInputFormatFromSplit(groupedSplit); + try { + initInputFormatFromSplit(groupedSplit); + } catch (TezException e) { + throw new IOException(e); + } return new TezGroupedSplitsRecordReader(groupedSplit, job, reporter); } @SuppressWarnings({ "unchecked", "rawtypes" }) - void initInputFormatFromSplit(TezGroupedSplit split) { + void initInputFormatFromSplit(TezGroupedSplit split) throws TezException { if (wrappedInputFormat == null) { Class<? extends InputFormat> clazz = (Class<? extends InputFormat>) getClassFromName(split.wrappedInputFormatName); try { wrappedInputFormat = org.apache.hadoop.util.ReflectionUtils.newInstance(clazz, conf); } catch (Exception e) { - throw new TezUncheckedException(e); + throw new TezException(e); } } } - static Class<?> getClassFromName(String name) { + static Class<?> getClassFromName(String name) throws TezException { return ReflectionUtils.getClazz(name); } http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplit.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplit.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplit.java index 9275f14..f85bbcd 100644 --- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplit.java +++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplit.java @@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.TezUncheckedException; /** @@ -108,9 +109,13 @@ public class TezGroupedSplit extends InputSplit public void readFields(DataInput in) throws IOException { wrappedInputFormatName = Text.readString(in); String inputSplitClassName = Text.readString(in); - Class<? extends InputSplit> clazz = - (Class<? extends InputSplit>) - TezGroupedSplitsInputFormat.getClassFromName(inputSplitClassName); + Class<? extends InputSplit> clazz = null; + try { + clazz = (Class<? extends InputSplit>) + TezGroupedSplitsInputFormat.getClassFromName(inputSplitClassName); + } catch (TezException e) { + throw new IOException(e); + } int numSplits = in.readInt(); http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java index 519b52a..8aabbf6 100644 --- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java +++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/TezGroupedSplitsInputFormat.java @@ -34,6 +34,7 @@ import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.tez.common.ReflectionUtils; +import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.TezUncheckedException; import com.google.common.base.Preconditions; @@ -126,24 +127,28 @@ public class TezGroupedSplitsInputFormat<K, V> extends InputFormat<K, V> public RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { TezGroupedSplit groupedSplit = (TezGroupedSplit) split; - initInputFormatFromSplit(groupedSplit); + try { + initInputFormatFromSplit(groupedSplit); + } catch (TezException e) { + throw new IOException(e); + } return new TezGroupedSplitsRecordReader(groupedSplit, context); } @SuppressWarnings({ "rawtypes", "unchecked" }) - void initInputFormatFromSplit(TezGroupedSplit split) { + void initInputFormatFromSplit(TezGroupedSplit split) throws TezException { if (wrappedInputFormat == null) { Class<? extends InputFormat> clazz = (Class<? extends InputFormat>) getClassFromName(split.wrappedInputFormatName); try { wrappedInputFormat = org.apache.hadoop.util.ReflectionUtils.newInstance(clazz, conf); } catch (Exception e) { - throw new TezUncheckedException(e); + throw new TezException(e); } } } - static Class<?> getClassFromName(String name) { + static Class<?> getClassFromName(String name) throws TezException { return ReflectionUtils.getClazz(name); } http://git-wip-us.apache.org/repos/asf/tez/blob/1b30b17d/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java index 9a2d77e..d0e935f 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java +++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java @@ -40,6 +40,7 @@ import org.apache.tez.common.security.HistoryACLPolicyManager; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.dag.api.TezReflectionException; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.history.DAGHistoryEvent; import org.apache.tez.dag.history.HistoryEventType; @@ -128,7 +129,7 @@ public class ATSHistoryLoggingService extends HistoryLoggingService { historyACLPolicyManager = ReflectionUtils.createClazzInstance( atsHistoryACLManagerClassName); historyACLPolicyManager.setConf(conf); - } catch (TezUncheckedException e) { + } catch (TezReflectionException e) { LOG.warn("Could not instantiate object for " + atsHistoryACLManagerClassName + ". ACLs cannot be enforced correctly for history data in Timeline", e); if (!conf.getBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS,
