Repository: tez Updated Branches: refs/heads/master edddea808 -> 28cd991b8
TEZ-2663. SessionNotRunning exceptions are wrapped in a ServiceException from a dying AM. (hitesh) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/28cd991b Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/28cd991b Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/28cd991b Branch: refs/heads/master Commit: 28cd991b86c0e216e80f9246d8c0bddaa5b0f97c Parents: edddea8 Author: Hitesh Shah <[email protected]> Authored: Thu Aug 6 11:08:30 2015 -0700 Committer: Hitesh Shah <[email protected]> Committed: Thu Aug 6 11:08:30 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../java/org/apache/tez/client/TezClient.java | 3 +- .../java/org/apache/tez/common/RPCUtil.java | 173 ++++++++++++++++++ .../tez/dag/api/client/DAGClientImpl.java | 4 + .../dag/api/client/rpc/DAGClientRPCImpl.java | 25 +-- .../java/org/apache/tez/common/TestRPCUtil.java | 181 +++++++++++++++++++ .../org/apache/tez/test/TestAMRecovery.java | 3 +- 7 files changed, 376 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/28cd991b/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d986913..5d3c4f4 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -15,6 +15,7 @@ INCOMPATIBLE CHANGES TEZ-2650. Timing details on Vertex state changes ALL CHANGES: + TEZ-2663. SessionNotRunning exceptions are wrapped in a ServiceException from a dying AM. TEZ-2630. TezChild receives IP address instead of FQDN. TEZ-2684. ShuffleVertexManager.parsePartitionStats throws IllegalStateException: Stats should be initialized. TEZ-2172. FetcherOrderedGrouped using List to store InputAttemptIdentifier can lead to some inefficiency during remove() operation. http://git-wip-us.apache.org/repos/asf/tez/blob/28cd991b/tez-api/src/main/java/org/apache/tez/client/TezClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java index aad6e76..2590879 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java @@ -25,6 +25,7 @@ import java.util.Map; import javax.annotation.Nullable; +import org.apache.tez.common.RPCUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -473,7 +474,7 @@ public class TezClient { dagId = response.getDagId(); } } catch (ServiceException e) { - throw new TezException(e); + RPCUtil.unwrapAndThrowException(e); } LOG.info("Submitted dag to TezSession" + ", sessionName=" + clientName http://git-wip-us.apache.org/repos/asf/tez/blob/28cd991b/tez-api/src/main/java/org/apache/tez/common/RPCUtil.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/RPCUtil.java b/tez-api/src/main/java/org/apache/tez/common/RPCUtil.java new file mode 100644 index 0000000..caeb822 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/common/RPCUtil.java @@ -0,0 +1,173 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tez.common; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; + +import org.apache.hadoop.ipc.RemoteException; +import org.apache.tez.dag.api.DAGNotRunningException; +import org.apache.tez.dag.api.SessionNotRunning; +import org.apache.tez.dag.api.TezException; + +import com.google.protobuf.ServiceException; + +public class RPCUtil { + + /** + * Returns an instance of {@link TezException} + */ + public static TezException getRemoteException(Throwable t) { + return new TezException(t); + } + + /** + * Returns an instance of {@link TezException} + */ + public static TezException getRemoteException(String message) { + return new TezException(message); + } + + private static <T extends Throwable> T instantiateException( + Class<? extends T> cls, RemoteException re) throws RemoteException { + try { + Constructor<? extends T> cn = cls.getConstructor(String.class); + cn.setAccessible(true); + T ex = cn.newInstance(re.getMessage()); + ex.initCause(re); + return ex; + // RemoteException contains useful information as against the + // java.lang.reflect exceptions. + } catch (NoSuchMethodException e) { + throw re; + } catch (IllegalArgumentException e) { + throw re; + } catch (SecurityException e) { + throw re; + } catch (InstantiationException e) { + throw re; + } catch (IllegalAccessException e) { + throw re; + } catch (InvocationTargetException e) { + throw re; + } + } + + private static <T extends TezException> T instantiateTezException( + Class<? extends T> cls, RemoteException re) throws RemoteException { + return instantiateException(cls, re); + } + + private static <T extends IOException> T instantiateIOException( + Class<? extends T> cls, RemoteException re) throws RemoteException { + return instantiateException(cls, re); + } + + private static <T extends RuntimeException> T instantiateRuntimeException( + Class<? extends T> cls, RemoteException re) throws RemoteException { + return instantiateException(cls, re); + } + + private static <T extends SessionNotRunning> T instantiateSessionNotRunningException( + Class<? extends T> cls, RemoteException re) throws RemoteException { + return instantiateException(cls, re); + } + + + /** + * Utility method that unwraps and returns appropriate exceptions. + * + * @param se + * ServiceException + * @return An instance of the actual exception, which will be a subclass of + * {@link TezException} or {@link IOException} + */ + public static Void unwrapAndThrowException(ServiceException se) + throws IOException, TezException { + + Throwable cause = se.getCause(); + if (cause == null) { + // SE generated by the RPC layer itself. + throw new IOException(se); + } else { + if (cause instanceof RemoteException) { + RemoteException re = (RemoteException) cause; + Class<?> realClass = null; + try { + realClass = Class.forName(re.getClassName()); + } catch (ClassNotFoundException cnf) { + // Assume this to be a new exception type added to Tez. This isn't + // absolutely correct since the RPC layer could add an exception as + // well. + throw instantiateTezException(TezException.class, re); + } + + if (SessionNotRunning.class.isAssignableFrom(realClass)) { + throw instantiateTezException( + realClass.asSubclass(SessionNotRunning.class), re); + } else if (DAGNotRunningException.class.isAssignableFrom(realClass)) { + throw instantiateTezException( + realClass.asSubclass(DAGNotRunningException.class), re); + } else if (TezException.class.isAssignableFrom(realClass)) { + throw instantiateTezException( + realClass.asSubclass(TezException.class), re); + } else if (IOException.class.isAssignableFrom(realClass)) { + throw instantiateIOException(realClass.asSubclass(IOException.class), + re); + } else if (RuntimeException.class.isAssignableFrom(realClass)) { + throw instantiateRuntimeException( + realClass.asSubclass(RuntimeException.class), re); + } else { + throw re; + } + // RemoteException contains useful information as against the + // java.lang.reflect exceptions. + + } else if (cause instanceof IOException) { + // RPC Client exception. + throw (IOException) cause; + } else if (cause instanceof RuntimeException) { + // RPC RuntimeException + throw (RuntimeException) cause; + } else { + // Should not be generated. + throw new IOException(se); + } + } + } + + /** + * Utility method that unwraps and returns appropriate exceptions. + * + * @param se + * ServiceException + * @return An instance of the actual exception, which will be a subclass of + * {@link TezException} or {@link IOException} + */ + public static Void unwrapAndThrowNonIOException(ServiceException se) + throws TezException { + try { + return unwrapAndThrowException(se); + } catch (IOException ioe) { + throw new TezException(ioe); + } + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/28cd991b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java index 4e2ff40..47c8a8e 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java @@ -353,6 +353,8 @@ public class DAGClientImpl extends DAGClient { dagCompleted = true; } catch (TezException e) { // can be either due to a n/w issue of due to AM completed. + } catch (IOException e) { + // can be either due to a n/w issue of due to AM completed. } if (dagStatus == null && !dagCompleted) { @@ -371,6 +373,8 @@ public class DAGClientImpl extends DAGClient { dagCompleted = true; } catch (TezException e) { // can be either due to a n/w issue of due to AM completed. + } catch (IOException e) { + // can be either due to a n/w issue of due to AM completed. } if (vertexStatus == null && !dagCompleted) { http://git-wip-us.apache.org/repos/asf/tez/blob/28cd991b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java index 223c0ab..240289c 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java @@ -23,11 +23,11 @@ import java.util.Set; import javax.annotation.Nullable; +import org.apache.tez.common.RPCUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.YarnApplicationState; @@ -52,6 +52,7 @@ import com.google.protobuf.ServiceException; @Private public class DAGClientRPCImpl extends DAGClient { + private static final Logger LOG = LoggerFactory.getLogger(DAGClientRPCImpl.class); private static final String DAG_NOT_RUNNING_CLASS_NAME = @@ -96,6 +97,9 @@ public class DAGClientRPCImpl extends DAGClient { } catch (TezException e) { resetProxy(e); // create proxy again throw e; + } catch (IOException e) { + resetProxy(e); // create proxy again + throw e; } } @@ -113,6 +117,9 @@ public class DAGClientRPCImpl extends DAGClient { } catch (TezException e) { resetProxy(e); // create proxy again throw e; + } catch (IOException e) { + resetProxy(e); // create proxy again + throw e; } } @@ -176,22 +183,15 @@ public class DAGClientRPCImpl extends DAGClient { proxy.getDAGStatus(null, requestProtoBuilder.build()).getDagStatus(), DagStatusSource.AM); } catch (ServiceException e) { - final Throwable cause = e.getCause(); - if (cause instanceof RemoteException) { - RemoteException remoteException = (RemoteException) cause; - if (DAG_NOT_RUNNING_CLASS_NAME.equals(remoteException.getClassName())) { - throw new DAGNotRunningException(remoteException.getMessage()); - } - } - - // TEZ-151 retrieve wrapped TezException + RPCUtil.unwrapAndThrowException(e); + // Should not reach here throw new TezException(e); } } VertexStatus getVertexStatusViaAM(String vertexName, Set<StatusGetOpts> statusOptions) - throws TezException { + throws TezException, IOException { if (LOG.isDebugEnabled()) { LOG.debug("GetVertexStatus via AM for app: " + appId + " dag: " + dagId + " vertex: " + vertexName); @@ -211,7 +211,8 @@ public class DAGClientRPCImpl extends DAGClient { proxy.getVertexStatus(null, requestProtoBuilder.build()).getVertexStatus()); } catch (ServiceException e) { - // TEZ-151 retrieve wrapped TezException + RPCUtil.unwrapAndThrowException(e); + // Should not reach here throw new TezException(e); } } http://git-wip-us.apache.org/repos/asf/tez/blob/28cd991b/tez-api/src/test/java/org/apache/tez/common/TestRPCUtil.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/common/TestRPCUtil.java b/tez-api/src/test/java/org/apache/tez/common/TestRPCUtil.java new file mode 100644 index 0000000..1e63b47 --- /dev/null +++ b/tez-api/src/test/java/org/apache/tez/common/TestRPCUtil.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.common; + +import java.io.FileNotFoundException; +import java.io.IOException; + +import org.apache.tez.dag.api.SessionNotRunning; +import org.apache.tez.dag.api.TezException; +import org.junit.Assert; + +import org.apache.hadoop.ipc.RemoteException; +import org.junit.Test; + +import com.google.protobuf.ServiceException; + +public class TestRPCUtil { + + @Test (timeout=1000) + public void testUnknownExceptionUnwrapping() { + Class<? extends Throwable> exception = TezException.class; + String className = "UnknownException.class"; + verifyRemoteExceptionUnwrapping(exception, className); + } + + @Test + public void testRemoteIOExceptionUnwrapping() { + Class<? extends Throwable> exception = IOException.class; + verifyRemoteExceptionUnwrapping(exception, exception.getName()); + } + + @Test + public void testRemoteIOExceptionDerivativeUnwrapping() { + // Test IOException sub-class + Class<? extends Throwable> exception = FileNotFoundException.class; + verifyRemoteExceptionUnwrapping(exception, exception.getName()); + } + + @Test + public void testRemoteTezExceptionUnwrapping() { + Class<? extends Throwable> exception = TezException.class; + verifyRemoteExceptionUnwrapping(exception, exception.getName()); + + } + + @Test + public void testRemoteTezExceptionDerivativeUnwrapping() { + Class<? extends Throwable> exception = SessionNotRunning.class; + verifyRemoteExceptionUnwrapping(exception, exception.getName()); + } + + @Test + public void testRemoteRuntimeExceptionUnwrapping() { + Class<? extends Throwable> exception = NullPointerException.class; + verifyRemoteExceptionUnwrapping(exception, exception.getName()); + } + + @Test + public void testUnexpectedRemoteExceptionUnwrapping() { + // Non IOException, TezException thrown by the remote side. + Class<? extends Throwable> exception = Exception.class; + verifyRemoteExceptionUnwrapping(RemoteException.class, exception.getName()); + } + + @Test + public void testRemoteTezExceptionWithoutStringConstructor() { + // Derivatives of TezException should always define a string constructor. + Class<? extends Throwable> exception = TezTestExceptionNoConstructor.class; + verifyRemoteExceptionUnwrapping(RemoteException.class, exception.getName()); + } + + @Test + public void testRPCServiceExceptionUnwrapping() { + String message = "ServiceExceptionMessage"; + ServiceException se = new ServiceException(message); + + Throwable t = null; + try { + RPCUtil.unwrapAndThrowException(se); + } catch (Throwable thrown) { + t = thrown; + } + + Assert.assertTrue(IOException.class.isInstance(t)); + Assert.assertTrue(t.getMessage().contains(message)); + } + + @Test + public void testRPCIOExceptionUnwrapping() { + String message = "DirectIOExceptionMessage"; + IOException ioException = new FileNotFoundException(message); + ServiceException se = new ServiceException(ioException); + + Throwable t = null; + try { + RPCUtil.unwrapAndThrowException(se); + } catch (Throwable thrown) { + t = thrown; + } + Assert.assertTrue(FileNotFoundException.class.isInstance(t)); + Assert.assertTrue(t.getMessage().contains(message)); + } + + @Test + public void testRPCRuntimeExceptionUnwrapping() { + String message = "RPCRuntimeExceptionUnwrapping"; + RuntimeException re = new NullPointerException(message); + ServiceException se = new ServiceException(re); + + Throwable t = null; + try { + RPCUtil.unwrapAndThrowException(se); + } catch (Throwable thrown) { + t = thrown; + } + + Assert.assertTrue(NullPointerException.class.isInstance(t)); + Assert.assertTrue(t.getMessage().contains(message)); + } + + private void verifyRemoteExceptionUnwrapping( + Class<? extends Throwable> expectedLocalException, + String realExceptionClassName) { + verifyRemoteExceptionUnwrapping(expectedLocalException, realExceptionClassName, true); + } + + private void verifyRemoteExceptionUnwrapping( + Class<? extends Throwable> expectedLocalException, + String realExceptionClassName, boolean allowIO) { + String message = realExceptionClassName + "Message"; + RemoteException re = new RemoteException(realExceptionClassName, message); + ServiceException se = new ServiceException(re); + + Throwable t = null; + try { + if (allowIO) { + RPCUtil.unwrapAndThrowException(se); + } else { + RPCUtil.unwrapAndThrowNonIOException(se); + } + } catch (Throwable thrown) { + t = thrown; + } + + Assert.assertTrue("Expected exception [" + expectedLocalException + + "] but found " + t, expectedLocalException.isInstance(t)); + Assert.assertTrue( + "Expected message [" + message + "] but found " + t.getMessage(), t + .getMessage().contains(message)); + } + + + @Test (timeout=1000) + public void testRemoteNonIOExceptionUnwrapping() { + Class<? extends Throwable> exception = TezException.class; + verifyRemoteExceptionUnwrapping(exception, IOException.class.getName(), false); + } + + + private static class TezTestExceptionNoConstructor extends + Exception { + private static final long serialVersionUID = 1L; + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/28cd991b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java index 74efee2..04b0a03 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java @@ -235,6 +235,7 @@ public class TestAMRecovery { createDAG("VertexCompletelyFinished_Broadcast", ControlledImmediateStartVertexManager.class, DataMovementType.BROADCAST, false); TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED); + assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue()); assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue()); @@ -483,7 +484,7 @@ public class TestAMRecovery { "application", "dag") + "_1" + TezConstants.DAG_RECOVERY_RECOVER_FILE_SUFFIX); if (fs.exists(recoveryFilePath)) { - LOG.info("read recovery file:" + recoveryFilePath); + LOG.info("Read recovery file:" + recoveryFilePath); historyEvents.addAll(RecoveryParser.parseDAGRecoveryFile(fs.open(recoveryFilePath))); } }
