Updated Branches: refs/heads/TEZ-1 37d7a4cc3 -> ef37823c9
TEZ-9. Support querying state of the DAG AM from a client via a client RPC (bikas) Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/ef37823c Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/ef37823c Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/ef37823c Branch: refs/heads/TEZ-1 Commit: ef37823c90efedb5a0b14f94b4e9e785cb1c5071 Parents: 37d7a4c Author: Bikas Saha <[email protected]> Authored: Tue May 21 18:28:41 2013 -0700 Committer: Bikas Saha <[email protected]> Committed: Tue May 21 18:28:41 2013 -0700 ---------------------------------------------------------------------- tez-dag-api/pom.xml | 15 ++ .../main/java/org/apache/tez/client/TezClient.java | 95 +++++++++++++ .../org/apache/tez/dag/api/TezConfiguration.java | 11 ++ .../java/org/apache/tez/dag/api/TezException.java | 2 +- .../org/apache/tez/dag/api/TezRemoteException.java | 31 +++++ .../org/apache/tez/dag/api/client/DAGClient.java | 10 +- .../org/apache/tez/dag/api/client/Progress.java | 9 ++ .../tez/dag/api/client/rpc/DAGClientRPCImpl.java | 18 ++- .../apache/tez/dag/api/client/DAGClientServer.java | 106 +++++++++++++++ .../DAGClientAMProtocolBlockingPBServerImpl.java | 103 ++++++++++++++ .../java/org/apache/tez/dag/app/DAGAppMaster.java | 49 ++++--- .../apache/tez/dag/app/client/ClientService.java | 29 ---- .../tez/dag/app/client/impl/TezClientService.java | 43 ------ .../apache/tez/dag/app/client/package-info.java | 20 --- .../tez/dag/app/rm/TaskSchedulerEventHandler.java | 31 +++-- .../tez/mapreduce/ClientServiceDelegate.java | 70 ++++++++-- .../org/apache/tez/mapreduce/DAGJobStatus.java | 27 ++++- 17 files changed, 517 insertions(+), 152 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ef37823c/tez-dag-api/pom.xml ---------------------------------------------------------------------- diff --git a/tez-dag-api/pom.xml b/tez-dag-api/pom.xml index 74b1284..3cb0952 100644 --- a/tez-dag-api/pom.xml +++ b/tez-dag-api/pom.xml @@ -34,6 +34,10 @@ </dependency> <dependency> <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-client</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-common</artifactId> </dependency> </dependencies> @@ -46,6 +50,17 @@ <configuration> </configuration> </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <configuration> + <archive> + <manifest> + <mainClass>org.apache.tez.client.TezClient</mainClass> + </manifest> + </archive> + </configuration> + </plugin> <plugin> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-maven-plugins</artifactId> http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ef37823c/tez-dag-api/src/main/java/org/apache/tez/client/TezClient.java ---------------------------------------------------------------------- diff --git a/tez-dag-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-dag-api/src/main/java/org/apache/tez/client/TezClient.java new file mode 100644 index 0000000..f794014 --- /dev/null +++ b/tez-dag-api/src/main/java/org/apache/tez/client/TezClient.java @@ -0,0 +1,95 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tez.client; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.client.YarnClient; +import org.apache.hadoop.yarn.client.YarnClientImpl; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.TezRemoteException; +import org.apache.tez.dag.api.client.DAGClient; +import org.apache.tez.dag.api.client.DAGStatus; +import org.apache.tez.dag.api.client.VertexStatus; +import org.apache.tez.dag.api.client.rpc.DAGClientRPCImpl; + +public class TezClient { + private final TezConfiguration conf; + + public TezClient(TezConfiguration conf) { + this.conf = conf; + } + + public DAGClient getDAGClient(String appIdStr) throws IOException, TezException { + try { + System.out.println("Fetching app: " + appIdStr); + ApplicationId appId = ConverterUtils.toApplicationId(appIdStr); + YarnClient yarnClient = new YarnClientImpl(); + yarnClient.init(conf); + yarnClient.start(); + ApplicationReport appReport = yarnClient.getApplicationReport(appId); + String host = appReport.getHost(); + int port = appReport.getRpcPort(); + return getDAGClient(host, port); + } catch (YarnRemoteException e) { + throw new TezException(e); + } + } + + public DAGClient getDAGClient(String host, int port) throws IOException { + System.out.println("App host port: " + host + ":" + port); + InetSocketAddress addr = new InetSocketAddress(host, port); + DAGClient dagClient; + dagClient = new DAGClientRPCImpl(1, addr, conf); + return dagClient; + } + + public static void main(String[] args) { + try { + TezClient tezClient = new TezClient( + new TezConfiguration(new YarnConfiguration())); + DAGClient dagClient = tezClient.getDAGClient(args[1]); + String dagId = dagClient.getAllDAGs().get(0); + DAGStatus dagStatus = dagClient.getDAGStatus(dagId); + System.out.println("DAG: " + dagId + + " State: " + dagStatus.getState() + + " Progress: " + dagStatus.getDAGProgress()); + for(String vertexName : dagStatus.getVertexProgress().keySet()) { + System.out.println("VertexStatus from DagStatus:" + + " Vertex: " + vertexName + + " Progress: " + dagStatus.getVertexProgress().get(vertexName)); + VertexStatus vertexStatus = dagClient.getVertexStatus(dagId, vertexName); + System.out.println("VertexStatus:" + + " Vertex: " + vertexName + + " State: " + vertexStatus.getState() + + " Progress: " + vertexStatus.getProgress()); + } + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ef37823c/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 597fb2f..4a407b3 100644 --- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -65,6 +65,17 @@ public class TezConfiguration extends Configuration { public static final String DAG_NODE_BLACKLISTING_IGNORE_THRESHOLD = TEZ_PREFIX + "node-blacklisting.ignore-threshold-node-percent"; public static final int DAG_NODE_BLACKLISTING_IGNORE_THRESHOLD_DEFAULT = 33; + + /** Number of threads to handle job client RPC requests.*/ + public static final String DAG_CLIENT_AM_THREAD_COUNT = + TEZ_PREFIX + "client.am.thread-count"; + public static final int DAG_CLIENT_AM__THREAD_COUNT_DEFAULT = 1; + /** + * Range of ports that the AM can use when binding. Leave blank + * if you want all possible ports. + */ + public static final String DAG_CLIENT_AM_PORT_RANGE = + TEZ_PREFIX + "client.am.port-range"; public static final String DAG_AM_RESOURCE_MEMORY_MB = DAG_AM_PREFIX http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ef37823c/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezException.java ---------------------------------------------------------------------- diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezException.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezException.java index 265b89c..3014309 100644 --- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezException.java +++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezException.java @@ -18,7 +18,7 @@ package org.apache.tez.dag.api; -/* +/** * Base Tez Exception */ public class TezException extends RuntimeException { http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ef37823c/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezRemoteException.java ---------------------------------------------------------------------- diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezRemoteException.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezRemoteException.java new file mode 100644 index 0000000..8311852 --- /dev/null +++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezRemoteException.java @@ -0,0 +1,31 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tez.dag.api; + +/** + * Base TezRemoteException + */ +public class TezRemoteException extends Exception { + private static final long serialVersionUID = 6337442733802964447L; + public TezRemoteException(Throwable cause) { super(cause); } + public TezRemoteException(String message) { super(message); } + public TezRemoteException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ef37823c/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java ---------------------------------------------------------------------- diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java index 477b017..0e9e55b 100644 --- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java +++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java @@ -18,9 +18,10 @@ package org.apache.tez.dag.api.client; +import java.io.IOException; import java.util.List; -import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.TezRemoteException; /* * Inteface class for monitoring the <code>DAG</code> running in a Tez DAG @@ -31,16 +32,17 @@ public interface DAGClient { /** * Return the identifiers for all DAG's */ - List<String> getAllDAGs() throws TezException; + List<String> getAllDAGs() throws IOException, TezRemoteException; /** * Get the status of a DAG */ - DAGStatus getDAGStatus(String dagId) throws TezException; + DAGStatus getDAGStatus(String dagId) throws IOException, TezRemoteException; /** * Get the status of a Vertex of a DAG */ VertexStatus getVertexStatus(String dagId, - String vertexName) throws TezException; + String vertexName) + throws IOException, TezRemoteException; } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ef37823c/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/Progress.java ---------------------------------------------------------------------- diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/Progress.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/Progress.java index 7158e30..af9f2a4 100644 --- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/Progress.java +++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/Progress.java @@ -47,5 +47,14 @@ public class Progress { public int getKilledTaskCount() { return proxy.getKilledTaskCount(); } + + @Override + public String toString() { + return new String("Total: " + getTotalTaskCount() + + " Succeeded: " + getSucceededTaskCount() + + " Running: " + getRunningTaskCount() + + " Failed: " + getFailedTaskCount() + + " Killed: " + getKilledTaskCount()); + } } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ef37823c/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java index 9238f63..42a815e 100644 --- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java +++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java @@ -26,7 +26,7 @@ import java.util.List; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; import org.apache.tez.dag.api.TezConfiguration; -import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.TezRemoteException; import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; import org.apache.tez.dag.api.client.VertexStatus; @@ -55,18 +55,20 @@ public class DAGClientRPCImpl implements DAGClient, Closeable { } @Override - public List<String> getAllDAGs() throws TezException { + public List<String> getAllDAGs() throws IOException, TezRemoteException { GetAllDAGsRequestProto requestProto = GetAllDAGsRequestProto.newBuilder().build(); try { return proxy.getAllDAGs(null, requestProto).getDagIdList(); } catch (ServiceException e) { - throw new TezException(e); + // TEZ-151 retrieve wrapped TezRemoteException + throw new TezRemoteException(e); } } @Override - public DAGStatus getDAGStatus(String dagId) throws TezException { + public DAGStatus getDAGStatus(String dagId) + throws IOException, TezRemoteException { GetDAGStatusRequestProto requestProto = GetDAGStatusRequestProto.newBuilder().setDagId(dagId).build(); @@ -74,13 +76,14 @@ public class DAGClientRPCImpl implements DAGClient, Closeable { return new DAGStatus( proxy.getDAGStatus(null, requestProto).getDagStatus()); } catch (ServiceException e) { - throw new TezException(e); + // TEZ-151 retrieve wrapped TezRemoteException + throw new TezRemoteException(e); } } @Override public VertexStatus getVertexStatus(String dagId, String vertexName) - throws TezException { + throws IOException, TezRemoteException { GetVertexStatusRequestProto requestProto = GetVertexStatusRequestProto.newBuilder(). setDagId(dagId).setVertexName(vertexName).build(); @@ -89,7 +92,8 @@ public class DAGClientRPCImpl implements DAGClient, Closeable { return new VertexStatus( proxy.getVertexStatus(null, requestProto).getVertexStatus()); } catch (ServiceException e) { - throw new TezException(e); + // TEZ-151 retrieve wrapped TezRemoteException + throw new TezRemoteException(e); } } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ef37823c/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java new file mode 100644 index 0000000..534fcaa --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java @@ -0,0 +1,106 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tez.dag.api.client; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.RPC.Server; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.yarn.service.AbstractService; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB; +import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPBServerImpl; +import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.DAGClientAMProtocol; + +import com.google.protobuf.BlockingService; + +public class DAGClientServer extends AbstractService { + static final Log LOG = LogFactory.getLog(DAGClientServer.class); + + DAGClient realInstance; + Server server; + InetSocketAddress bindAddress; + + public DAGClientServer(DAGClient realInstance) { + super("DAGClientRPCServer"); + this.realInstance = realInstance; + } + + @Override + public void start() { + try { + assert getConfig() instanceof TezConfiguration; + TezConfiguration conf = (TezConfiguration) getConfig(); + InetSocketAddress addr = new InetSocketAddress(0); + + DAGClientAMProtocolBlockingPBServerImpl service = + new DAGClientAMProtocolBlockingPBServerImpl(realInstance); + + BlockingService blockingService = + DAGClientAMProtocol.newReflectiveBlockingService(service); + + int numHandlers = conf.getInt(TezConfiguration.DAG_CLIENT_AM_THREAD_COUNT, + TezConfiguration.DAG_CLIENT_AM__THREAD_COUNT_DEFAULT); + + String portRange = conf.get(TezConfiguration.DAG_CLIENT_AM_PORT_RANGE); + + server = createServer(DAGClientAMProtocolBlockingPB.class, addr, conf, + numHandlers, blockingService, portRange); + server.start(); + bindAddress = NetUtils.getConnectAddress(server); + LOG.info("Instantiated DAGClientRPCServer at " + bindAddress); + super.start(); + } catch (Exception e) { + LOG.error("Failed to start DAGClientServer: ", e); + throw new TezException(e); + } + } + + @Override + public void stop() { + if(server != null) { + server.stop(); + } + super.stop(); + } + + public InetSocketAddress getBindAddress() { + return bindAddress; + } + + private Server createServer(Class<?> pbProtocol, InetSocketAddress addr, Configuration conf, + int numHandlers, + BlockingService blockingService, String portRangeConfig) throws IOException { + RPC.setProtocolEngine(conf, pbProtocol, ProtobufRpcEngine.class); + RPC.Server server = new RPC.Builder(conf).setProtocol(pbProtocol) + .setInstance(blockingService).setBindAddress(addr.getHostName()) + .setPort(addr.getPort()).setNumHandlers(numHandlers).setVerbose(false) + .setPortRangeConfig(portRangeConfig) + .build(); + server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, pbProtocol, blockingService); + return server; + } +} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ef37823c/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java new file mode 100644 index 0000000..81d7868 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java @@ -0,0 +1,103 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tez.dag.api.client.rpc; + +import java.io.IOException; +import java.util.List; + +import org.apache.tez.dag.api.TezRemoteException; +import org.apache.tez.dag.api.client.DAGClient; +import org.apache.tez.dag.api.client.DAGStatus; +import org.apache.tez.dag.api.client.DAGStatusBuilder; +import org.apache.tez.dag.api.client.VertexStatus; +import org.apache.tez.dag.api.client.VertexStatusBuilder; +import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB; +import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAllDAGsRequestProto; +import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAllDAGsResponseProto; +import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetDAGStatusRequestProto; +import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetDAGStatusResponseProto; +import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetVertexStatusRequestProto; +import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetVertexStatusResponseProto; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +public class DAGClientAMProtocolBlockingPBServerImpl implements + DAGClientAMProtocolBlockingPB { + + DAGClient real; + + public DAGClientAMProtocolBlockingPBServerImpl(DAGClient real) { + this.real = real; + } + + @Override + public GetAllDAGsResponseProto getAllDAGs(RpcController controller, + GetAllDAGsRequestProto request) throws ServiceException { + try{ + List<String> dagIds = real.getAllDAGs(); + return GetAllDAGsResponseProto.newBuilder().addAllDagId(dagIds).build(); + } catch(TezRemoteException e) { + throw wrapException(e); + } catch(IOException e) { + throw wrapException(e); + } + } + + @Override + public GetDAGStatusResponseProto getDAGStatus(RpcController controller, + GetDAGStatusRequestProto request) throws ServiceException { + try { + String dagId = request.getDagId(); + DAGStatus status; + status = real.getDAGStatus(dagId); + assert status instanceof DAGStatusBuilder; + DAGStatusBuilder builder = (DAGStatusBuilder) status; + return GetDAGStatusResponseProto.newBuilder(). + setDagStatus(builder.getProto()).build(); + } catch (TezRemoteException e) { + throw wrapException(e); + } catch(IOException e) { + throw wrapException(e); + } + } + + @Override + public GetVertexStatusResponseProto getVertexStatus(RpcController controller, + GetVertexStatusRequestProto request) throws ServiceException { + try { + String dagId = request.getDagId(); + String vertexName = request.getVertexName(); + VertexStatus status = real.getVertexStatus(dagId, vertexName); + assert status instanceof VertexStatusBuilder; + VertexStatusBuilder builder = (VertexStatusBuilder) status; + return GetVertexStatusResponseProto.newBuilder(). + setVertexStatus(builder.getProto()).build(); + } catch (TezRemoteException e) { + throw wrapException(e); + } catch(IOException e) { + throw wrapException(e); + } + } + + ServiceException wrapException(Exception e){ + return new ServiceException(e); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ef37823c/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 804c20c..821dd71 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -67,13 +67,12 @@ import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; import org.apache.tez.dag.api.records.DAGProtos.VertexPlan; import org.apache.tez.dag.api.client.DAGClient; +import org.apache.tez.dag.api.client.DAGClientServer; import org.apache.tez.dag.api.client.DAGStatus; import org.apache.tez.dag.api.client.VertexStatus; import org.apache.tez.dag.api.DagTypeConverters; import org.apache.tez.dag.api.TezConfiguration; -import org.apache.tez.dag.api.TezException; -import org.apache.tez.dag.app.client.ClientService; -import org.apache.tez.dag.app.client.impl.TezClientService; +import org.apache.tez.dag.api.TezRemoteException; import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.app.dag.Task; import org.apache.tez.dag.app.dag.TaskAttempt; @@ -157,7 +156,6 @@ public class DAGAppMaster extends CompositeService { private AppContext context; private TezConfiguration conf; private Dispatcher dispatcher; - private ClientService clientService; // TODO Recovery //private Recovery recoveryServ; private ContainerLauncher containerLauncher; @@ -178,7 +176,9 @@ public class DAGAppMaster extends CompositeService { private HistoryEventHandler historyEventHandler; private DAGAppMasterState state; - private DAGMonitorServer monitor; + + DAGClientServer clientRpcServer; + private DAGClientHandler clientHandler; private DAG dag; private Credentials fsTokens = new Credentials(); // Filled during init @@ -230,13 +230,16 @@ public class DAGAppMaster extends CompositeService { dagId = new TezDAGID(appAttemptID.getApplicationId(), 1); - monitor = new DAGMonitorServer(); + clientHandler = new DAGClientHandler(); // TODO Committer. // committer = createOutputCommitter(conf); dispatcher = createDispatcher(); addIfService(dispatcher); + + clientRpcServer = new DAGClientServer(clientHandler); + addIfService(clientRpcServer); taskHeartbeatHandler = createTaskHeartbeatHandler(context, conf); addIfService(taskHeartbeatHandler); @@ -262,11 +265,6 @@ public class DAGAppMaster extends CompositeService { taskCleaner = createTaskCleaner(context); addIfService(taskCleaner); - // TODO TEZ-9 - //service to handle requests from JobClient - clientService = new TezClientService(); - addIfService(clientService); - this.dagEventDispatcher = new DagEventDispatcher(); this.vertexEventDispatcher = new VertexEventDispatcher(); @@ -287,7 +285,7 @@ public class DAGAppMaster extends CompositeService { dispatcher.register(NMCommunicatorEventType.class, containerLauncher); taskSchedulerEventHandler = new TaskSchedulerEventHandler(context, - clientService); + clientRpcServer); addIfService(taskSchedulerEventHandler); dispatcher.register(AMSchedulerEventType.class, taskSchedulerEventHandler); @@ -712,30 +710,37 @@ public class DAGAppMaster extends CompositeService { LOG.info("On DAG completion. Old state: " + oldState + " new state: " + state); } - class DAGMonitorServer implements DAGClient { + class DAGClientHandler implements DAGClient { @Override - public List<String> getAllDAGs() { + public List<String> getAllDAGs() throws TezRemoteException { return Collections.singletonList(dag.getID().toString()); } @Override - public DAGStatus getDAGStatus(String dagIdStr) throws TezException { + public DAGStatus getDAGStatus(String dagIdStr) + throws IOException, TezRemoteException { return getDAG(dagIdStr).getDAGStatus(); } @Override - public VertexStatus getVertexStatus(String dagIdStr, String vertexName) { - return getDAG(dagIdStr).getVertexStatus(vertexName); + public VertexStatus getVertexStatus(String dagIdStr, String vertexName) + throws IOException, TezRemoteException{ + VertexStatus status = getDAG(dagIdStr).getVertexStatus(vertexName); + if(status == null) { + throw new TezRemoteException("Unknown vertexName: " + vertexName); + } + + return status; } - DAG getDAG(String dagIdStr) { + DAG getDAG(String dagIdStr) throws IOException, TezRemoteException { TezDAGID dagId = TezDAGID.fromString(dagIdStr); if(dagId == null) { - throw new TezException("Bad dagId: " + dagIdStr); + throw new TezRemoteException("Bad dagId: " + dagIdStr); } if(!dagId.equals(dag.getID())) { - throw new TezException("Unknown dagId: " + dagIdStr); + throw new TezRemoteException("Unknown dagId: " + dagIdStr); } return dag; } @@ -1083,7 +1088,9 @@ public class DAGAppMaster extends CompositeService { // that it doesnt take too long in shutting down // Signal the task scheduler. - appMaster.taskSchedulerEventHandler.setSignalled(true); + if(appMaster.getServiceState() == STATE.STARTED) { + appMaster.taskSchedulerEventHandler.setSignalled(true); + } if (EnumSet.of(DAGAppMasterState.NEW, DAGAppMasterState.INITED, DAGAppMasterState.RUNNING).contains(appMaster.state)) { // DAG not in a final state. Must have receive a KILL signal http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ef37823c/tez-dag/src/main/java/org/apache/tez/dag/app/client/ClientService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/client/ClientService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/client/ClientService.java deleted file mode 100644 index 64937a9..0000000 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/client/ClientService.java +++ /dev/null @@ -1,29 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.tez.dag.app.client; - -import java.net.InetSocketAddress; - -// TODONOTES - RPC service for clients -public interface ClientService { - - InetSocketAddress getBindAddress(); - - int getHttpPort(); -} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ef37823c/tez-dag/src/main/java/org/apache/tez/dag/app/client/impl/TezClientService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/client/impl/TezClientService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/client/impl/TezClientService.java deleted file mode 100644 index 8fd79f0..0000000 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/client/impl/TezClientService.java +++ /dev/null @@ -1,43 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.tez.dag.app.client.impl; - -import java.net.InetSocketAddress; - -import org.apache.tez.dag.app.client.ClientService; - -public class TezClientService implements ClientService { - - // TODO remove dummy client service - private final InetSocketAddress dummySocketAddress = - new InetSocketAddress(0); - - @Override - public InetSocketAddress getBindAddress() { - // TODO Auto-generated method stub - return dummySocketAddress; - } - - @Override - public int getHttpPort() { - // TODO Auto-generated method stub - return 0; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ef37823c/tez-dag/src/main/java/org/apache/tez/dag/app/client/package-info.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/client/package-info.java b/tez-dag/src/main/java/org/apache/tez/dag/app/client/package-info.java deleted file mode 100644 index a4cd9cc..0000000 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/client/package-info.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ [email protected] -package org.apache.tez.dag.app.client; -import org.apache.hadoop.classification.InterfaceAudience; http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ef37823c/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java index 85ba0b2..c61cbcc 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java @@ -38,11 +38,11 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.service.AbstractService; +import org.apache.tez.dag.api.client.DAGClientServer; import org.apache.tez.dag.api.oldrecords.TaskAttemptState; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.DAGAppMaster; import org.apache.tez.dag.app.DAGAppMasterState; -import org.apache.tez.dag.app.client.ClientService; import org.apache.tez.dag.app.dag.TaskAttempt; import org.apache.tez.dag.app.dag.event.DAGAppMasterEvent; import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType; @@ -69,31 +69,24 @@ public class TaskSchedulerEventHandler extends AbstractService protected final AppContext appContext; @SuppressWarnings("rawtypes") private final EventHandler eventHandler; - private final TaskScheduler taskScheduler; - // TODO change this to DAGAppMaster + private TaskScheduler taskScheduler; private DAGAppMaster dagAppMaster; private Map<ApplicationAccessType, String> appAcls = null; private Thread eventHandlingThread; private volatile boolean stopEventHandling; // Has a signal (SIGTERM etc) been issued? protected volatile boolean isSignalled = false; + final DAGClientServer clientService; BlockingQueue<AMSchedulerEvent> eventQueue = new LinkedBlockingQueue<AMSchedulerEvent>(); public TaskSchedulerEventHandler(AppContext appContext, - ClientService clientService) { + DAGClientServer clientService) { super(TaskSchedulerEventHandler.class.getName()); this.appContext = appContext; - eventHandler = appContext.getEventHandler(); - InetSocketAddress serviceAddr = clientService.getBindAddress(); - taskScheduler = - new TaskScheduler(appContext.getApplicationAttemptId(), - this, - serviceAddr.getHostName(), - serviceAddr.getPort(), - serviceAddr.getHostName() + - ":" + clientService.getHttpPort()); + this.eventHandler = appContext.getEventHandler(); + this.clientService = clientService; } public Map<ApplicationAccessType, String> getApplicationAcls() { @@ -346,14 +339,22 @@ public class TaskSchedulerEventHandler extends AbstractService @Override public synchronized void init(Configuration conf) { super.init(conf); - taskScheduler.init(conf); - // todo set heartbeat value from conf here } @Override public synchronized void start() { // FIXME hack alert how is this supposed to support multiple DAGs? // Answer: this is shared across dags. need job==app-dag-master + // TODO set heartbeat value from conf here + InetSocketAddress serviceAddr = clientService.getBindAddress(); + taskScheduler = + new TaskScheduler(appContext.getApplicationAttemptId(), + this, + serviceAddr.getHostName(), + serviceAddr.getPort(), + ""); + taskScheduler.init(getConfig()); + dagAppMaster = appContext.getAppMaster(); taskScheduler.start(); this.eventHandlingThread = new Thread() { http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ef37823c/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java ---------------------------------------------------------------------- diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java index 9cdc845..44ccc07 100644 --- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java +++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java @@ -18,34 +18,47 @@ package org.apache.tez.mapreduce; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.mapreduce.*; -import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.v2.LogParams; import org.apache.hadoop.mapreduce.TaskCompletionEvent; import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.tez.client.TezClient; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezRemoteException; +import org.apache.tez.dag.api.client.DAGClient; +import org.apache.tez.dag.api.client.DAGStatus; import java.io.IOException; public class ClientServiceDelegate { + private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class); - private final Configuration conf; + private final TezConfiguration conf; private final ResourceMgrDelegate rm; + private DAGClient dagClient; + private String currentDAGId; + private TezClient tezClient; + private ApplicationReport appReport; // FIXME // how to handle completed jobs that the RM does not know about? public ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm, JobID jobId) { - this.conf = new Configuration(conf); // Cloning for modifying. + this.conf = new TezConfiguration(conf); // Cloning for modifying. // For faster redirects from AM to HS. this.conf.setInt( CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, this.conf.getInt(MRJobConfig.MR_CLIENT_TO_AM_IPC_MAX_RETRIES, MRJobConfig.DEFAULT_MR_CLIENT_TO_AM_IPC_MAX_RETRIES)); this.rm = rm; + tezClient = new TezClient(this.conf); } public org.apache.hadoop.mapreduce.Counters getJobCounters(JobID jobId) @@ -74,17 +87,42 @@ public class ClientServiceDelegate { } public JobStatus getJobStatus(JobID oldJobID) throws IOException { - org.apache.hadoop.mapreduce.v2.api.records.JobId jobId = - TypeConverter.toYarn(oldJobID); - ApplicationReport appReport; try { - appReport = rm.getApplicationReport(jobId.getAppId()); - } catch (YarnRemoteException e) { - throw new IOException(e); + if(dagClient == null) { + appReport = getAppReport(oldJobID); + if(appReport.getYarnApplicationState() != YarnApplicationState.RUNNING) { + // if job not running return status from appReport; + return getJobStatusFromRM(appReport); + } else { + // job is running. create dag am client + dagClient = tezClient.getDAGClient(appReport.getHost(), + appReport.getRpcPort()); + currentDAGId = dagClient.getAllDAGs().get(0); + } + } + // return status from client. use saved appReport for queue etc + DAGStatus dagStatus = dagClient.getDAGStatus(currentDAGId); + return new DAGJobStatus(appReport, dagStatus); + } catch (TezRemoteException e) { + // AM not responding + dagClient = null; + currentDAGId = null; + appReport = getAppReport(oldJobID); + if(appReport.getYarnApplicationState() != YarnApplicationState.RUNNING) { + LOG.info("App not running. Falling back to RM for report."); + } else { + LOG.warn("App running but failed to get report from AM.", e); + } } + + // final fallback + return getJobStatusFromRM(appReport); + } + + private JobStatus getJobStatusFromRM(ApplicationReport appReport) { JobStatus jobStatus = - new DAGJobStatus(appReport); - return jobStatus; + new DAGJobStatus(appReport, null); + return jobStatus; } public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports( @@ -114,4 +152,14 @@ public class ClientServiceDelegate { // FIXME logs for an attempt? throw new UnsupportedOperationException(); } + + private ApplicationReport getAppReport(JobID oldJobID) throws IOException { + org.apache.hadoop.mapreduce.v2.api.records.JobId jobId = + TypeConverter.toYarn(oldJobID); + try { + return rm.getApplicationReport(jobId.getAppId()); + } catch (YarnRemoteException e) { + throw new IOException(e); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ef37823c/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java ---------------------------------------------------------------------- diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java index d58cea4..08c61f8 100644 --- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java +++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java @@ -32,14 +32,20 @@ import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.tez.dag.api.client.DAGStatus; +import org.apache.tez.dag.api.client.Progress; +import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil; +import org.mortbay.log.Log; public class DAGJobStatus extends JobStatus { private final ApplicationReport report; + private final DAGStatus dagStatus; - public DAGJobStatus(ApplicationReport appReport) { + public DAGJobStatus(ApplicationReport appReport, DAGStatus dagStatus) { super(); this.report = appReport; + this.dagStatus = dagStatus; } @Override @@ -129,6 +135,9 @@ public class DAGJobStatus extends JobStatus { @Override public synchronized float getMapProgress() { + if(dagStatus != null) { + return getProgress(MultiStageMRConfigUtil.getInitialMapVertexName()); + } if (report.getYarnApplicationState().equals( YarnApplicationState.FINISHED) && report.getFinalApplicationStatus().equals( @@ -160,6 +169,9 @@ public class DAGJobStatus extends JobStatus { @Override public synchronized float getReduceProgress() { + if(dagStatus != null) { + return getProgress(MultiStageMRConfigUtil.getFinalReduceVertexName()); + } if (report.getYarnApplicationState().equals( YarnApplicationState.FINISHED) && report.getFinalApplicationStatus().equals( @@ -350,5 +362,18 @@ public class DAGJobStatus extends JobStatus { buffer.append("needed-mem" + getNeededMem()); return buffer.toString(); } + + private float getProgress(String vertexName) { + Progress progress = dagStatus.getVertexProgress().get(vertexName); + if(progress == null) { + // no such stage. return 0 like MR app currently does. + return 0; + } + float totalTasks = (float) progress.getTotalTaskCount(); + if(totalTasks != 0) { + return progress.getSucceededTaskCount()/totalTasks; + } + return 1; + } }
