Repository: tez Updated Branches: refs/heads/master 3f5a7f35d -> 2af886b50
TEZ-3156. Tez client keeps trying to talk to RM even if RM does not know about the application. (hitesh) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/2af886b5 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/2af886b5 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/2af886b5 Branch: refs/heads/master Commit: 2af886b509015200e1c04527275474cbc771c667 Parents: 3f5a7f3 Author: Hitesh Shah <[email protected]> Authored: Thu Mar 3 12:19:12 2016 -0800 Committer: Hitesh Shah <[email protected]> Committed: Thu Mar 3 12:19:12 2016 -0800 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/tez/dag/api/client/DAGClient.java | 1 - .../tez/dag/api/client/DAGClientImpl.java | 24 +++- .../tez/dag/api/client/DAGClientInternal.java | 116 +++++++++++++++++++ .../dag/api/client/DAGClientTimelineImpl.java | 4 +- .../dag/api/client/rpc/DAGClientRPCImpl.java | 40 ++++--- 6 files changed, 169 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/2af886b5/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 5a7ae58..0c6cacc 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES TEZ-3029. Add an onError method to service plugin contexts. ALL CHANGES: + TEZ-3156. Tez client keeps trying to talk to RM even if RM does not know about the application. TEZ-3115. Shuffle string handling adds significant memory overhead TEZ-3151. Expose DAG credentials to plugins. TEZ-3149. Tez-tools: Add username in DagInfo. @@ -396,6 +397,7 @@ INCOMPATIBLE CHANGES TEZ-2949. Allow duplicate dag names within session for Tez. ALL CHANGES: + TEZ-3156. Tez client keeps trying to talk to RM even if RM does not know about the application. TEZ-3115. Shuffle string handling adds significant memory overhead TEZ-3149. Tez-tools: Add username in DagInfo. TEZ-2988. DAGAppMaster::shutdownTezAM should return with a no-op if it has been invoked earlier. http://git-wip-us.apache.org/repos/asf/tez/blob/2af886b5/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java index 27b316b..9b11b96 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java @@ -26,7 +26,6 @@ import javax.annotation.Nullable; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; -import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.tez.dag.api.TezException; http://git-wip-us.apache.org/repos/asf/tez/blob/2af886b5/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java index fa22c32..af67ee8 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java @@ -30,6 +30,8 @@ import java.util.Set; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; + +import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -58,7 +60,7 @@ public class DAGClientImpl extends DAGClient { private final FrameworkClient frameworkClient; @VisibleForTesting - protected DAGClient realClient; + protected DAGClientInternal realClient; private boolean dagCompleted = false; @VisibleForTesting protected boolean isATSEnabled = false; @@ -240,6 +242,9 @@ public class DAGClientImpl extends DAGClient { if (dagStatus.isCompleted()) { return dagStatus; } + } catch (ApplicationNotFoundException e) { + LOG.info("Failed to fetch DAG data for completed DAG from YARN Timeline" + + " - Application not found by YARN", e); } catch (TezException e) { if (LOG.isDebugEnabled()) { LOG.info("DAGStatus fetch failed." + e.getMessage()); @@ -291,6 +296,10 @@ public class DAGClientImpl extends DAGClient { if (vertexCompletionStates.contains(vertexStatus.getState())) { return vertexStatus; } + } catch (ApplicationNotFoundException e) { + LOG.info("Failed to fetch Vertex data for completed DAG from YARN Timeline" + + " - Application not found by YARN", e); + return null; } catch (TezException e) { if (LOG.isDebugEnabled()) { LOG.debug("ERROR fetching vertex data from Yarn Timeline. " + e.getMessage()); @@ -350,6 +359,10 @@ public class DAGClientImpl extends DAGClient { try { dagStatus = realClient.getDAGStatus(statusOptions, timeout); } catch (DAGNotRunningException e) { + LOG.info("DAG is no longer running", e); + dagCompleted = true; + } catch (ApplicationNotFoundException e) { + LOG.info("DAG is no longer running - application not found by YARN", e); dagCompleted = true; } catch (TezException e) { // can be either due to a n/w issue of due to AM completed. @@ -370,6 +383,10 @@ public class DAGClientImpl extends DAGClient { try { vertexStatus = realClient.getVertexStatus(vertexName, statusOptions); } catch (DAGNotRunningException e) { + LOG.info("DAG is no longer running", e); + dagCompleted = true; + } catch (ApplicationNotFoundException e) { + LOG.info("DAG is no longer running - application not found by YARN", e); dagCompleted = true; } catch (TezException e) { // can be either due to a n/w issue of due to AM completed. @@ -398,6 +415,9 @@ public class DAGClientImpl extends DAGClient { ApplicationReport appReport; try { appReport = frameworkClient.getApplicationReport(appId); + } catch (ApplicationNotFoundException e) { + LOG.info("DAG is no longer running - application not found by YARN", e); + throw new DAGNotRunningException(e); } catch (YarnException e) { throw new TezException(e); } @@ -592,7 +612,7 @@ public class DAGClientImpl extends DAGClient { } @VisibleForTesting - public DAGClient getRealClient() { + public DAGClientInternal getRealClient() { return realClient; } http://git-wip-us.apache.org/repos/asf/tez/blob/2af886b5/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientInternal.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientInternal.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientInternal.java new file mode 100644 index 0000000..bb236a3 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientInternal.java @@ -0,0 +1,116 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tez.dag.api.client; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Set; + +import javax.annotation.Nullable; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; +import org.apache.tez.dag.api.TezException; + +/** + * Private internal class for monitoring the <code>DAG</code> running in a Tez DAG + * Application Master. + */ +@Private +public abstract class DAGClientInternal implements Closeable { + + /** + * Gets DAG execution context for use with logging + * @return summary of DAG execution + */ + public abstract String getExecutionContext(); + + @Private + /** + * Get the YARN ApplicationReport for the app running the DAG. For performance + * reasons this may be stale copy and should be used to access static info. It + * may be null. + * @return <code>ApplicationReport</code> or null + */ + protected abstract ApplicationReport getApplicationReportInternal(); + + /** + * Get the status of the specified DAG + * @param statusOptions Optionally, retrieve additional information based on + * specified options. To retrieve basic information, this can be null + */ + public abstract DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions) + throws IOException, TezException, ApplicationNotFoundException; + + /** + * Get the status of the specified DAG when it reaches a final state, or the timeout expires. + * + * @param statusOptions Optionally, retrieve additional information based on + * specified options. To retrieve basic information, this can be null + * @param timeout RPC call timeout. Value -1 waits for infinite and returns when + * DAG reaches final state + * @return DAG Status + * @throws IOException + * @throws TezException + */ + public abstract DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions, + long timeout) + throws IOException, TezException, ApplicationNotFoundException; + + /** + * Get the status of a Vertex of a DAG + * @param statusOptions Optionally, retrieve additional information based on + * specified options + */ + public abstract VertexStatus getVertexStatus(String vertexName, + Set<StatusGetOpts> statusOptions) + throws IOException, TezException, ApplicationNotFoundException; + + /** + * Kill a running DAG + * + */ + public abstract void tryKillDAG() throws IOException, TezException; + + /** + * Wait for DAG to complete without printing any vertex statuses + * + * @return Final DAG Status + * @throws IOException + * @throws TezException + * @throws InterruptedException + */ + public abstract DAGStatus waitForCompletion() throws IOException, TezException, InterruptedException; + + /** + * Wait for DAG to complete and periodically print *all* vertices' status. + * + * @param statusGetOpts + * : status get options. For example, to get counter pass + * <code>EnumSet.of(StatusGetOpts.GET_COUNTERS)</code> + * @return Final DAG Status + * @throws IOException + * @throws TezException + * @throws InterruptedException + */ + public abstract DAGStatus waitForCompletionWithStatusUpdates(@Nullable Set<StatusGetOpts> statusGetOpts) + throws IOException, TezException, InterruptedException; +} http://git-wip-us.apache.org/repos/asf/tez/blob/2af886b5/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java index 2294759..ffd91b7 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java @@ -37,6 +37,8 @@ import com.sun.jersey.api.client.ClientHandlerException; import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.UniformInterfaceException; import com.sun.jersey.api.client.WebResource; + +import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -63,7 +65,7 @@ import org.codehaus.jettison.json.JSONObject; @Private -public class DAGClientTimelineImpl extends DAGClient { +public class DAGClientTimelineImpl extends DAGClientInternal { private static final Logger LOG = LoggerFactory.getLogger(DAGClientTimelineImpl.class); private static final String FILTER_BY_FIELDS = "primaryfilters,otherinfo"; http://git-wip-us.apache.org/repos/asf/tez/blob/2af886b5/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java index 240289c..ff48755 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java @@ -23,7 +23,10 @@ import java.util.Set; import javax.annotation.Nullable; +import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.tez.common.RPCUtil; +import org.apache.tez.dag.api.SessionNotRunning; +import org.apache.tez.dag.api.client.DAGClientInternal; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -51,7 +54,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ServiceException; @Private -public class DAGClientRPCImpl extends DAGClient { +public class DAGClientRPCImpl extends DAGClientInternal { private static final Logger LOG = LoggerFactory.getLogger(DAGClientRPCImpl.class); @@ -82,15 +85,15 @@ public class DAGClientRPCImpl extends DAGClient { @Override public DAGStatus getDAGStatus(Set<StatusGetOpts> statusOptions) - throws IOException, TezException { + throws IOException, TezException, ApplicationNotFoundException { return getDAGStatus(statusOptions, 0); } @Override public DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions, - long timeout) throws IOException, TezException { - if(createAMProxyIfNeeded()) { + long timeout) throws IOException, TezException, ApplicationNotFoundException { + if (createAMProxyIfNeeded()) { try { DAGStatus dagStatus = getDAGStatusViaAM(statusOptions, timeout); return dagStatus; @@ -110,7 +113,8 @@ public class DAGClientRPCImpl extends DAGClient { @Override public VertexStatus getVertexStatus(String vertexName, Set<StatusGetOpts> statusOptions) - throws IOException, TezException { + throws IOException, TezException, ApplicationNotFoundException { + if(createAMProxyIfNeeded()) { try { return getVertexStatusViaAM(vertexName, statusOptions); @@ -133,14 +137,18 @@ public class DAGClientRPCImpl extends DAGClient { if(LOG.isDebugEnabled()) { LOG.debug("TryKill for app: " + appId + " dag:" + dagId); } - if(createAMProxyIfNeeded()) { - TryKillDAGRequestProto requestProto = - TryKillDAGRequestProto.newBuilder().setDagId(dagId).build(); - try { - proxy.tryKillDAG(null, requestProto); - } catch (ServiceException e) { - resetProxy(e); + try { + if (createAMProxyIfNeeded()) { + TryKillDAGRequestProto requestProto = + TryKillDAGRequestProto.newBuilder().setDagId(dagId).build(); + try { + proxy.tryKillDAG(null, requestProto); + } catch (ServiceException e) { + resetProxy(e); + } } + } catch (ApplicationNotFoundException e) { + throw new SessionNotRunning("Application already completed"); } } @@ -217,7 +225,8 @@ public class DAGClientRPCImpl extends DAGClient { } } - ApplicationReport getAppReport() throws IOException, TezException { + ApplicationReport getAppReport() throws IOException, TezException, + ApplicationNotFoundException { try { ApplicationReport appReport = frameworkClient.getApplicationReport(appId); if (LOG.isDebugEnabled()) { @@ -225,12 +234,15 @@ public class DAGClientRPCImpl extends DAGClient { + appReport.getYarnApplicationState()); } return appReport; + } catch (ApplicationNotFoundException e) { + throw e; } catch (YarnException e) { throw new TezException(e); } } - boolean createAMProxyIfNeeded() throws IOException, TezException { + boolean createAMProxyIfNeeded() throws IOException, TezException, + ApplicationNotFoundException { if(proxy != null) { // if proxy exist optimistically use it assuming there is no retry return true;
