Repository: tez Updated Branches: refs/heads/master ff7081e06 -> 3e785f183
TEZ-1319. TEZ-1321. Remove methods annotated as @Private from TezClient and DAGClient. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3e785f18 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3e785f18 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3e785f18 Branch: refs/heads/master Commit: 3e785f183ad5482d3bcf815a01966c2af146df99 Parents: ff7081e Author: Siddharth Seth <[email protected]> Authored: Mon Jul 28 15:19:44 2014 -0700 Committer: Siddharth Seth <[email protected]> Committed: Mon Jul 28 15:19:44 2014 -0700 ---------------------------------------------------------------------- .../java/org/apache/tez/client/TezClient.java | 10 +- .../apache/tez/dag/api/client/DAGClient.java | 20 ++-- .../dag/api/client/rpc/DAGClientRPCImpl.java | 4 +- .../tez/dag/api/client/rpc/TestDAGClient.java | 2 +- .../examples/GroupByOrderByMRRTest.java | 4 +- .../java/org/apache/tez/client/MRTezClient.java | 54 +++++++++++ .../apache/tez/dag/api/client/MRDAGClient.java | 96 ++++++++++++++++++++ .../apache/tez/mapreduce/client/YARNRunner.java | 18 ++-- 8 files changed, 179 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/3e785f18/tez-api/src/main/java/org/apache/tez/client/TezClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java index e36866c..28546c4 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java @@ -643,9 +643,8 @@ public class TezClient { return submitDAGApplication(appId, dag); } - @Private - // To be used only by YarnRunner - public DAGClient submitDAGApplication(ApplicationId appId, DAG dag) + @Private // To be used only by YarnRunner + DAGClient submitDAGApplication(ApplicationId appId, DAG dag) throws TezException, IOException { LOG.info("Submitting DAG application with id: " + appId); try { @@ -655,7 +654,7 @@ public class TezClient { if (credentials == null) { credentials = new Credentials(); } - TezClientUtils.processTezLocalCredentialsFile(credentials, + TezClientUtils.processTezLocalCredentialsFile(credentials, amConfig.getTezConfiguration()); // Add session token for shuffle @@ -698,7 +697,7 @@ public class TezClient { } @Private // Used only for MapReduce compatibility code - public static DAGClient getDAGClient(ApplicationId appId, TezConfiguration tezConf) + static DAGClient getDAGClient(ApplicationId appId, TezConfiguration tezConf) throws IOException, TezException { return new DAGClientRPCImpl(appId, getDefaultTezDAGID(appId), tezConf); } @@ -716,6 +715,7 @@ public class TezClient { } }; + // Used only for MapReduce compatibility code private static String getDefaultTezDAGID(ApplicationId appId) { return (new StringBuilder(DAG)).append(SEPARATOR). append(appId.getClusterTimestamp()). http://git-wip-us.apache.org/repos/asf/tez/blob/3e785f18/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java index 74a5b77..a3e42db 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java @@ -33,13 +33,13 @@ import org.apache.tez.dag.api.Vertex; * Interface class for monitoring the <code>DAG</code> running in a Tez DAG * Application Master. */ -public interface DAGClient extends Closeable { +public abstract class DAGClient implements Closeable { /** * Get the YARN ApplicationId for the app running the DAG * @return <code>ApplicationId</code> */ - public ApplicationId getApplicationId(); + public abstract ApplicationId getApplicationId(); @Private /** @@ -48,14 +48,14 @@ public interface DAGClient extends Closeable { * may be null. * @return <code>ApplicationReport</code> or null */ - public ApplicationReport getApplicationReport(); + protected abstract ApplicationReport getApplicationReportInternal(); /** * Get the status of the specified DAG * @param statusOptions Optionally, retrieve additional information based on * specified options */ - public DAGStatus getDAGStatus(Set<StatusGetOpts> statusOptions) + public abstract DAGStatus getDAGStatus(Set<StatusGetOpts> statusOptions) throws IOException, TezException; /** @@ -63,7 +63,7 @@ public interface DAGClient extends Closeable { * @param statusOptions Optionally, retrieve additional information based on * specified options */ - public VertexStatus getVertexStatus(String vertexName, + public abstract VertexStatus getVertexStatus(String vertexName, Set<StatusGetOpts> statusOptions) throws IOException, TezException; @@ -71,7 +71,7 @@ public interface DAGClient extends Closeable { * Kill a running DAG * */ - public void tryKillDAG() throws IOException, TezException; + public abstract void tryKillDAG() throws IOException, TezException; /** * Wait for DAG to complete without printing any vertex statuses @@ -80,12 +80,12 @@ public interface DAGClient extends Closeable { * @throws IOException * @throws TezException */ - public DAGStatus waitForCompletion() throws IOException, TezException; + public abstract DAGStatus waitForCompletion() throws IOException, TezException; /** * Wait for DAG to complete and print the selected vertex status periodically. * - * @param vertexNames + * @param vertices * which vertex details to print; null mean no vertex status and it * is equivalent to call <code>waitForCompletion()</code> * @param statusGetOpts @@ -95,7 +95,7 @@ public interface DAGClient extends Closeable { * @throws IOException * @throws TezException */ - public DAGStatus waitForCompletionWithStatusUpdates(@Nullable Set<Vertex> vertices, + public abstract DAGStatus waitForCompletionWithStatusUpdates(@Nullable Set<Vertex> vertices, @Nullable Set<StatusGetOpts> statusGetOpts) throws IOException, TezException; /** @@ -108,6 +108,6 @@ public interface DAGClient extends Closeable { * @throws IOException * @throws TezException */ - DAGStatus waitForCompletionWithAllStatusUpdates(@Nullable Set<StatusGetOpts> statusGetOpts) + public abstract DAGStatus waitForCompletionWithAllStatusUpdates(@Nullable Set<StatusGetOpts> statusGetOpts) throws IOException, TezException; } http://git-wip-us.apache.org/repos/asf/tez/blob/3e785f18/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java index 5b63364..bf8fa39 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java @@ -57,7 +57,7 @@ import org.apache.tez.dag.api.records.DAGProtos.DAGStatusStateProto; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ServiceException; -public class DAGClientRPCImpl implements DAGClient { +public class DAGClientRPCImpl extends DAGClient { private static final Log LOG = LogFactory.getLog(DAGClientRPCImpl.class); private static final long SLEEP_FOR_COMPLETION = 500; @@ -147,7 +147,7 @@ public class DAGClientRPCImpl implements DAGClient { } @Override - public ApplicationReport getApplicationReport() { + protected ApplicationReport getApplicationReportInternal() { return appReport; } http://git-wip-us.apache.org/repos/asf/tez/blob/3e785f18/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java b/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java index 7deed48..62aca7d 100644 --- a/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java +++ b/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java @@ -199,7 +199,7 @@ public class TestDAGClient { @Test public void testApp() throws IOException, TezException, ServiceException{ assertEquals(mockAppId, dagClient.getApplicationId()); - assertEquals(mockAppReport, dagClient.getApplicationReport()); + assertEquals(mockAppReport, dagClient.getApplicationReportInternal()); } @Test http://git-wip-us.apache.org/repos/asf/tez/blob/3e785f18/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java index 59deb6b..26dbee9 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java @@ -39,7 +39,7 @@ import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.tez.client.TezClient; +import org.apache.tez.client.MRTezClient; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.client.DAGClient; @@ -199,7 +199,7 @@ public class GroupByOrderByMRRTest extends Configured implements Tool { JobID jobId = job.getJobID(); ApplicationId appId = TypeConverter.toYarn(jobId).getAppId(); - DAGClient dagClient = TezClient.getDAGClient(appId, new TezConfiguration(conf)); + DAGClient dagClient = MRTezClient.getDAGClient(appId, new TezConfiguration(conf)); DAGStatus dagStatus; String[] vNames = { "initialmap" , "ireduce1" , "finalreduce" }; while (true) { http://git-wip-us.apache.org/repos/asf/tez/blob/3e785f18/tez-mapreduce/src/main/java/org/apache/tez/client/MRTezClient.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/client/MRTezClient.java b/tez-mapreduce/src/main/java/org/apache/tez/client/MRTezClient.java new file mode 100644 index 0000000..49ac3ff --- /dev/null +++ b/tez-mapreduce/src/main/java/org/apache/tez/client/MRTezClient.java @@ -0,0 +1,54 @@ +/* + * * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.tez.client; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.client.DAGClient; +import org.apache.tez.dag.api.client.MRDAGClient; + [email protected] +public class MRTezClient extends TezClient { + public MRTezClient(String name, TezConfiguration tezConf, boolean isSession, + @Nullable Map<String, LocalResource> localResources, + @Nullable Credentials credentials) { + super(name, tezConf, isSession, localResources, credentials); + } + + // To be used only by YarnRunner + public DAGClient submitDAGApplication(ApplicationId appId, org.apache.tez.dag.api.DAG dag) + throws TezException, IOException { + return super.submitDAGApplication(appId, dag); + } + + public static MRDAGClient getDAGClient(ApplicationId appId, TezConfiguration tezConf) + throws IOException, TezException { + return new MRDAGClient(TezClient.getDAGClient(appId, tezConf)); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/3e785f18/tez-mapreduce/src/main/java/org/apache/tez/dag/api/client/MRDAGClient.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/dag/api/client/MRDAGClient.java b/tez-mapreduce/src/main/java/org/apache/tez/dag/api/client/MRDAGClient.java new file mode 100644 index 0000000..7de00b5 --- /dev/null +++ b/tez-mapreduce/src/main/java/org/apache/tez/dag/api/client/MRDAGClient.java @@ -0,0 +1,96 @@ +/* + * * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.tez.dag.api.client; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.Vertex; + [email protected] +public class MRDAGClient extends DAGClient { + + private final DAGClient realClient; + + public MRDAGClient(DAGClient dagClient) { + this.realClient = dagClient; + } + + @Override + public ApplicationId getApplicationId() { + return realClient.getApplicationId(); + } + + @Override + protected ApplicationReport getApplicationReportInternal() { + return realClient.getApplicationReportInternal(); + } + + public ApplicationReport getApplicationReport() { + return getApplicationReportInternal(); + } + + @Override + public DAGStatus getDAGStatus( + Set<StatusGetOpts> statusOptions) throws IOException, TezException { + return realClient.getDAGStatus(statusOptions); + } + + @Override + public VertexStatus getVertexStatus(String vertexName, + Set<StatusGetOpts> statusOptions) throws IOException, + TezException { + return realClient.getVertexStatus(vertexName, statusOptions); + } + + @Override + public void tryKillDAG() throws IOException, TezException { + realClient.tryKillDAG(); + } + + @Override + public DAGStatus waitForCompletion() throws IOException, TezException { + return realClient.waitForCompletion(); + } + + @Override + public DAGStatus waitForCompletionWithStatusUpdates( + @Nullable Set<Vertex> vertices, + @Nullable Set<StatusGetOpts> statusGetOpts) throws IOException, TezException { + return realClient.waitForCompletionWithStatusUpdates(vertices, statusGetOpts); + } + + @Override + public DAGStatus waitForCompletionWithAllStatusUpdates( + @Nullable Set<StatusGetOpts> statusGetOpts) throws IOException, TezException { + return realClient.waitForCompletionWithAllStatusUpdates(statusGetOpts); + } + + @Override + public void close() throws IOException { + realClient.close(); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/3e785f18/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java index f758f6a..d785be3 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java @@ -77,7 +77,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; -import org.apache.tez.client.TezClient; +import org.apache.tez.client.MRTezClient; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.Edge; import org.apache.tez.dag.api.ProcessorDescriptor; @@ -86,8 +86,8 @@ import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint; import org.apache.tez.dag.api.VertexManagerPluginDescriptor; -import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; +import org.apache.tez.dag.api.client.MRDAGClient; import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager; import org.apache.tez.mapreduce.hadoop.DeprecatedKeys; import org.apache.tez.mapreduce.hadoop.MRHelpers; @@ -120,8 +120,8 @@ public class YARNRunner implements ClientProtocol { final public static int UTF8_CHUNK_SIZE = 16 * 1024; private final TezConfiguration tezConf; - private TezClient tezSession; - private DAGClient dagClient; + private MRTezClient tezClient; + private MRDAGClient dagClient; /** * Yarn runner incapsulates the client interface of @@ -615,10 +615,10 @@ public class YARNRunner implements ClientProtocol { dagAMConf.setInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, jobConf.getInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS)); - tezSession = new TezClient("MapReduce", dagAMConf, false, jobLocalResources, ts); - tezSession.start(); - tezSession.submitDAGApplication(appId, dag); - tezSession.stop(); + tezClient = new MRTezClient("MapReduce", dagAMConf, false, jobLocalResources, ts); + tezClient.start(); + tezClient.submitDAGApplication(appId, dag); + tezClient.stop(); } catch (TezException e) { throw new IOException(e); } @@ -676,7 +676,7 @@ public class YARNRunner implements ClientProtocol { DAGStatus dagStatus; try { if(dagClient == null) { - dagClient = TezClient.getDAGClient(TypeConverter.toYarn(jobID).getAppId(), tezConf); + dagClient = MRTezClient.getDAGClient(TypeConverter.toYarn(jobID).getAppId(), tezConf); } dagStatus = dagClient.getDAGStatus(null); return new DAGJobStatus(dagClient.getApplicationReport(), dagStatus, jobFile);
