Repository: tez
Updated Branches:
refs/heads/branch-0.7 84922f834 -> 765c75b37
TEZ-3156. Tez client keeps trying to talk to RM even if RM does not know about
the application. (hitesh)
(cherry picked from commit 2af886b509015200e1c04527275474cbc771c667)
Conflicts:
CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/765c75b3
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/765c75b3
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/765c75b3
Branch: refs/heads/branch-0.7
Commit: 765c75b37af1d213a9d01f72ac9735cc1af9021b
Parents: 84922f8
Author: Hitesh Shah <[email protected]>
Authored: Thu Mar 3 12:19:12 2016 -0800
Committer: Hitesh Shah <[email protected]>
Committed: Thu Mar 3 12:20:07 2016 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/tez/dag/api/client/DAGClient.java | 1 -
.../tez/dag/api/client/DAGClientImpl.java | 24 +++-
.../tez/dag/api/client/DAGClientInternal.java | 116 +++++++++++++++++++
.../dag/api/client/DAGClientTimelineImpl.java | 4 +-
.../dag/api/client/rpc/DAGClientRPCImpl.java | 40 ++++---
6 files changed, 168 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/765c75b3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b73fd72..e73329e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
TEZ-2972. Avoid task rescheduling when a node turns unhealthy
ALL CHANGES:
+ TEZ-3156. Tez client keeps trying to talk to RM even if RM does not know
about the application.
TEZ-3115. Shuffle string handling adds significant memory overhead
TEZ-3149. Tez-tools: Add username in DagInfo
TEZ-2988. DAGAppMaster::shutdownTezAM should return with a no-op if it has
been invoked earlier.
http://git-wip-us.apache.org/repos/asf/tez/blob/765c75b3/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
index 27b316b..9b11b96 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
@@ -26,7 +26,6 @@ import javax.annotation.Nullable;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.tez.dag.api.TezException;
http://git-wip-us.apache.org/repos/asf/tez/blob/765c75b3/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
----------------------------------------------------------------------
diff --git
a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
index fa22c32..af67ee8 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java
@@ -30,6 +30,8 @@ import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -58,7 +60,7 @@ public class DAGClientImpl extends DAGClient {
private final FrameworkClient frameworkClient;
@VisibleForTesting
- protected DAGClient realClient;
+ protected DAGClientInternal realClient;
private boolean dagCompleted = false;
@VisibleForTesting
protected boolean isATSEnabled = false;
@@ -240,6 +242,9 @@ public class DAGClientImpl extends DAGClient {
if (dagStatus.isCompleted()) {
return dagStatus;
}
+ } catch (ApplicationNotFoundException e) {
+ LOG.info("Failed to fetch DAG data for completed DAG from YARN
Timeline"
+ + " - Application not found by YARN", e);
} catch (TezException e) {
if (LOG.isDebugEnabled()) {
LOG.info("DAGStatus fetch failed." + e.getMessage());
@@ -291,6 +296,10 @@ public class DAGClientImpl extends DAGClient {
if (vertexCompletionStates.contains(vertexStatus.getState())) {
return vertexStatus;
}
+ } catch (ApplicationNotFoundException e) {
+ LOG.info("Failed to fetch Vertex data for completed DAG from YARN
Timeline"
+ + " - Application not found by YARN", e);
+ return null;
} catch (TezException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("ERROR fetching vertex data from Yarn Timeline. " +
e.getMessage());
@@ -350,6 +359,10 @@ public class DAGClientImpl extends DAGClient {
try {
dagStatus = realClient.getDAGStatus(statusOptions, timeout);
} catch (DAGNotRunningException e) {
+ LOG.info("DAG is no longer running", e);
+ dagCompleted = true;
+ } catch (ApplicationNotFoundException e) {
+ LOG.info("DAG is no longer running - application not found by YARN", e);
dagCompleted = true;
} catch (TezException e) {
// can be either due to a n/w issue of due to AM completed.
@@ -370,6 +383,10 @@ public class DAGClientImpl extends DAGClient {
try {
vertexStatus = realClient.getVertexStatus(vertexName, statusOptions);
} catch (DAGNotRunningException e) {
+ LOG.info("DAG is no longer running", e);
+ dagCompleted = true;
+ } catch (ApplicationNotFoundException e) {
+ LOG.info("DAG is no longer running - application not found by YARN", e);
dagCompleted = true;
} catch (TezException e) {
// can be either due to a n/w issue of due to AM completed.
@@ -398,6 +415,9 @@ public class DAGClientImpl extends DAGClient {
ApplicationReport appReport;
try {
appReport = frameworkClient.getApplicationReport(appId);
+ } catch (ApplicationNotFoundException e) {
+ LOG.info("DAG is no longer running - application not found by YARN", e);
+ throw new DAGNotRunningException(e);
} catch (YarnException e) {
throw new TezException(e);
}
@@ -592,7 +612,7 @@ public class DAGClientImpl extends DAGClient {
}
@VisibleForTesting
- public DAGClient getRealClient() {
+ public DAGClientInternal getRealClient() {
return realClient;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/765c75b3/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientInternal.java
----------------------------------------------------------------------
diff --git
a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientInternal.java
b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientInternal.java
new file mode 100644
index 0000000..bb236a3
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientInternal.java
@@ -0,0 +1,116 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.api.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
+import org.apache.tez.dag.api.TezException;
+
+/**
+ * Private internal class for monitoring the <code>DAG</code> running in a Tez
DAG
+ * Application Master.
+ */
+@Private
+public abstract class DAGClientInternal implements Closeable {
+
+ /**
+ * Gets DAG execution context for use with logging
+ * @return summary of DAG execution
+ */
+ public abstract String getExecutionContext();
+
+ @Private
+ /**
+ * Get the YARN ApplicationReport for the app running the DAG. For
performance
+ * reasons this may be stale copy and should be used to access static info.
It
+ * may be null.
+ * @return <code>ApplicationReport</code> or null
+ */
+ protected abstract ApplicationReport getApplicationReportInternal();
+
+ /**
+ * Get the status of the specified DAG
+ * @param statusOptions Optionally, retrieve additional information based on
+ * specified options. To retrieve basic information,
this can be null
+ */
+ public abstract DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts>
statusOptions)
+ throws IOException, TezException, ApplicationNotFoundException;
+
+ /**
+ * Get the status of the specified DAG when it reaches a final state, or the
timeout expires.
+ *
+ * @param statusOptions Optionally, retrieve additional information based on
+ * specified options. To retrieve basic information,
this can be null
+ * @param timeout RPC call timeout. Value -1 waits for infinite and returns
when
+ * DAG reaches final state
+ * @return DAG Status
+ * @throws IOException
+ * @throws TezException
+ */
+ public abstract DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts>
statusOptions,
+ long timeout)
+ throws IOException, TezException, ApplicationNotFoundException;
+
+ /**
+ * Get the status of a Vertex of a DAG
+ * @param statusOptions Optionally, retrieve additional information based on
+ * specified options
+ */
+ public abstract VertexStatus getVertexStatus(String vertexName,
+ Set<StatusGetOpts> statusOptions)
+ throws IOException, TezException, ApplicationNotFoundException;
+
+ /**
+ * Kill a running DAG
+ *
+ */
+ public abstract void tryKillDAG() throws IOException, TezException;
+
+ /**
+ * Wait for DAG to complete without printing any vertex statuses
+ *
+ * @return Final DAG Status
+ * @throws IOException
+ * @throws TezException
+ * @throws InterruptedException
+ */
+ public abstract DAGStatus waitForCompletion() throws IOException,
TezException, InterruptedException;
+
+ /**
+ * Wait for DAG to complete and periodically print *all* vertices' status.
+ *
+ * @param statusGetOpts
+ * : status get options. For example, to get counter pass
+ * <code>EnumSet.of(StatusGetOpts.GET_COUNTERS)</code>
+ * @return Final DAG Status
+ * @throws IOException
+ * @throws TezException
+ * @throws InterruptedException
+ */
+ public abstract DAGStatus waitForCompletionWithStatusUpdates(@Nullable
Set<StatusGetOpts> statusGetOpts)
+ throws IOException, TezException, InterruptedException;
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/765c75b3/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
----------------------------------------------------------------------
diff --git
a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
index 2294759..ffd91b7 100644
---
a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
+++
b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientTimelineImpl.java
@@ -37,6 +37,8 @@ import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.UniformInterfaceException;
import com.sun.jersey.api.client.WebResource;
+
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -63,7 +65,7 @@ import org.codehaus.jettison.json.JSONObject;
@Private
-public class DAGClientTimelineImpl extends DAGClient {
+public class DAGClientTimelineImpl extends DAGClientInternal {
private static final Logger LOG =
LoggerFactory.getLogger(DAGClientTimelineImpl.class);
private static final String FILTER_BY_FIELDS = "primaryfilters,otherinfo";
http://git-wip-us.apache.org/repos/asf/tez/blob/765c75b3/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
----------------------------------------------------------------------
diff --git
a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
index 240289c..ff48755 100644
---
a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
+++
b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
@@ -23,7 +23,10 @@ import java.util.Set;
import javax.annotation.Nullable;
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.tez.common.RPCUtil;
+import org.apache.tez.dag.api.SessionNotRunning;
+import org.apache.tez.dag.api.client.DAGClientInternal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -51,7 +54,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ServiceException;
@Private
-public class DAGClientRPCImpl extends DAGClient {
+public class DAGClientRPCImpl extends DAGClientInternal {
private static final Logger LOG =
LoggerFactory.getLogger(DAGClientRPCImpl.class);
@@ -82,15 +85,15 @@ public class DAGClientRPCImpl extends DAGClient {
@Override
public DAGStatus getDAGStatus(Set<StatusGetOpts> statusOptions)
- throws IOException, TezException {
+ throws IOException, TezException, ApplicationNotFoundException {
return getDAGStatus(statusOptions, 0);
}
@Override
public DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions,
- long timeout) throws IOException, TezException {
- if(createAMProxyIfNeeded()) {
+ long timeout) throws IOException, TezException,
ApplicationNotFoundException {
+ if (createAMProxyIfNeeded()) {
try {
DAGStatus dagStatus = getDAGStatusViaAM(statusOptions, timeout);
return dagStatus;
@@ -110,7 +113,8 @@ public class DAGClientRPCImpl extends DAGClient {
@Override
public VertexStatus getVertexStatus(String vertexName,
Set<StatusGetOpts> statusOptions)
- throws IOException, TezException {
+ throws IOException, TezException, ApplicationNotFoundException {
+
if(createAMProxyIfNeeded()) {
try {
return getVertexStatusViaAM(vertexName, statusOptions);
@@ -133,14 +137,18 @@ public class DAGClientRPCImpl extends DAGClient {
if(LOG.isDebugEnabled()) {
LOG.debug("TryKill for app: " + appId + " dag:" + dagId);
}
- if(createAMProxyIfNeeded()) {
- TryKillDAGRequestProto requestProto =
- TryKillDAGRequestProto.newBuilder().setDagId(dagId).build();
- try {
- proxy.tryKillDAG(null, requestProto);
- } catch (ServiceException e) {
- resetProxy(e);
+ try {
+ if (createAMProxyIfNeeded()) {
+ TryKillDAGRequestProto requestProto =
+ TryKillDAGRequestProto.newBuilder().setDagId(dagId).build();
+ try {
+ proxy.tryKillDAG(null, requestProto);
+ } catch (ServiceException e) {
+ resetProxy(e);
+ }
}
+ } catch (ApplicationNotFoundException e) {
+ throw new SessionNotRunning("Application already completed");
}
}
@@ -217,7 +225,8 @@ public class DAGClientRPCImpl extends DAGClient {
}
}
- ApplicationReport getAppReport() throws IOException, TezException {
+ ApplicationReport getAppReport() throws IOException, TezException,
+ ApplicationNotFoundException {
try {
ApplicationReport appReport =
frameworkClient.getApplicationReport(appId);
if (LOG.isDebugEnabled()) {
@@ -225,12 +234,15 @@ public class DAGClientRPCImpl extends DAGClient {
+ appReport.getYarnApplicationState());
}
return appReport;
+ } catch (ApplicationNotFoundException e) {
+ throw e;
} catch (YarnException e) {
throw new TezException(e);
}
}
- boolean createAMProxyIfNeeded() throws IOException, TezException {
+ boolean createAMProxyIfNeeded() throws IOException, TezException,
+ ApplicationNotFoundException {
if(proxy != null) {
// if proxy exist optimistically use it assuming there is no retry
return true;