Repository: tez Updated Branches: refs/heads/master 692e2a0e7 -> c0d59139c
TEZ-1304. Abstract out client interactions with YARN. Contributed by Jonathan Eagles. Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c0d59139 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c0d59139 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c0d59139 Branch: refs/heads/master Commit: c0d59139c0c88a38e12d5b2240f2df7f5314baa4 Parents: 692e2a0 Author: Siddharth Seth <[email protected]> Authored: Sun Jul 27 21:58:54 2014 -0700 Committer: Siddharth Seth <[email protected]> Committed: Sun Jul 27 21:58:54 2014 -0700 ---------------------------------------------------------------------- .../org/apache/tez/client/FrameworkClient.java | 53 +++++++++++++ .../java/org/apache/tez/client/TezClient.java | 29 ++++---- .../org/apache/tez/client/TezClientUtils.java | 3 +- .../org/apache/tez/client/TezYarnClient.java | 78 ++++++++++++++++++++ .../dag/api/client/rpc/DAGClientRPCImpl.java | 19 +++-- .../org/apache/tez/client/TestTezClient.java | 6 +- 6 files changed, 158 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/c0d59139/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java b/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java new file mode 100644 index 0000000..2f97399 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/client/FrameworkClient.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.client; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.client.api.YarnClientApplication; +import org.apache.hadoop.yarn.exceptions.YarnException; + +public abstract class FrameworkClient { + + public static FrameworkClient createFrameworkClient() { + return new TezYarnClient(YarnClient.createYarnClient()); + } + + public abstract void init(Configuration conf); + + public abstract void start(); + + public abstract void stop(); + + public abstract void close() throws IOException; + + public abstract YarnClientApplication createApplication() throws YarnException, IOException; + + public abstract ApplicationId submitApplication(ApplicationSubmissionContext appSubmissionContext) throws YarnException, IOException; + + public abstract void killApplication(ApplicationId appId) throws YarnException, IOException; + + public abstract ApplicationReport getApplicationReport(ApplicationId appId) throws YarnException, IOException; + +} http://git-wip-us.apache.org/repos/asf/tez/blob/c0d59139/tez-api/src/main/java/org/apache/tez/client/TezClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java index d984877..e36866c 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java @@ -36,7 +36,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; -import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.tez.common.TezYARNUtils; @@ -89,7 +88,7 @@ public class TezClient { private ApplicationId sessionAppId; private ApplicationId lastSubmittedAppId; private AMConfiguration amConfig; - private YarnClient yarnClient; + private FrameworkClient frameworkClient; private boolean isSession; private boolean sessionStarted = false; private boolean sessionStopped = false; @@ -250,9 +249,9 @@ public class TezClient { public synchronized void start() throws TezException, IOException { amConfig.setYarnConfiguration(new YarnConfiguration(amConfig.getTezConfiguration())); - yarnClient = createYarnClient(); - yarnClient.init(amConfig.getYarnConfiguration()); - yarnClient.start(); + frameworkClient = createFrameworkClient(); + frameworkClient.init(amConfig.getYarnConfiguration()); + frameworkClient.start(); if (isSession) { LOG.info("Session mode. Starting session."); @@ -286,7 +285,7 @@ public class TezClient { TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT)) { appContext.setMaxAppAttempts(1); } - yarnClient.submitApplication(appContext); + frameworkClient.submitApplication(appContext); sessionStarted = true; } catch (YarnException e) { throw new TezException(e); @@ -422,15 +421,15 @@ public class TezClient { + ", sessionName=" + clientName + ", applicationId=" + sessionAppId); try { - yarnClient.killApplication(sessionAppId); + frameworkClient.killApplication(sessionAppId); } catch (YarnException e) { throw new TezException(e); } } } } finally { - if (yarnClient != null) { - yarnClient.close(); + if (frameworkClient != null) { + frameworkClient.close(); } } } @@ -476,7 +475,7 @@ public class TezClient { } Preconditions.checkState(appId != null, "Cannot get status without starting an application"); try { - ApplicationReport appReport = yarnClient.getApplicationReport( + ApplicationReport appReport = frameworkClient.getApplicationReport( appId); switch (appReport.getYarnApplicationState()) { case NEW: @@ -600,15 +599,15 @@ public class TezClient { } // for testing - protected YarnClient createYarnClient() { - return YarnClient.createYarnClient(); + protected FrameworkClient createFrameworkClient() { + return FrameworkClient.createFrameworkClient(); } // for testing protected DAGClientAMProtocolBlockingPB getSessionAMProxy(ApplicationId appId) throws TezException, IOException { return TezClientUtils.getSessionAMProxy( - yarnClient, amConfig.getYarnConfiguration(), appId); + frameworkClient, amConfig.getYarnConfiguration(), appId); } private DAGClientAMProtocolBlockingPB waitForProxy() @@ -672,7 +671,7 @@ public class TezClient { + ", applicationId=" + appId + ", dagName=" + dag.getName()); - yarnClient.submitApplication(appContext); + frameworkClient.submitApplication(appContext); lastSubmittedAppId = appId; } catch (YarnException e) { throw new TezException(e); @@ -682,7 +681,7 @@ public class TezClient { private ApplicationId createApplication() throws TezException, IOException { try { - return yarnClient.createApplication(). + return frameworkClient.createApplication(). getNewApplicationResponse().getApplicationId(); } catch (YarnException e) { throw new TezException(e); http://git-wip-us.apache.org/repos/asf/tez/blob/c0d59139/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java index 93c5f34..d99d35d 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java @@ -67,7 +67,6 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.YarnApplicationState; -import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier; @@ -730,7 +729,7 @@ public class TezClientUtils { return textPath; } - static DAGClientAMProtocolBlockingPB getSessionAMProxy(YarnClient yarnClient, + static DAGClientAMProtocolBlockingPB getSessionAMProxy(FrameworkClient yarnClient, Configuration conf, ApplicationId applicationId) throws TezException, IOException { ApplicationReport appReport; http://git-wip-us.apache.org/repos/asf/tez/blob/c0d59139/tez-api/src/main/java/org/apache/tez/client/TezYarnClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/TezYarnClient.java b/tez-api/src/main/java/org/apache/tez/client/TezYarnClient.java new file mode 100644 index 0000000..3d8d24a --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/client/TezYarnClient.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.client; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.client.api.YarnClientApplication; +import org.apache.hadoop.yarn.exceptions.YarnException; + +public class TezYarnClient extends FrameworkClient { + + private final YarnClient yarnClient; + + protected TezYarnClient(YarnClient yarnClient) { + this.yarnClient = yarnClient; + } + + @Override + public void init(Configuration conf) { + yarnClient.init(conf); + } + + @Override + public void start() { + yarnClient.start(); + } + + @Override + public void stop() { + yarnClient.stop(); + } + + @Override + public final void close() throws IOException { + yarnClient.close(); + } + + @Override + public YarnClientApplication createApplication() throws YarnException, IOException { + return yarnClient.createApplication(); + } + + @Override + public ApplicationId submitApplication(ApplicationSubmissionContext appSubmissionContext) throws YarnException, IOException { + return yarnClient.submitApplication(appSubmissionContext); + } + + @Override + public void killApplication(ApplicationId appId) throws YarnException, IOException { + yarnClient.killApplication(appId); + } + + @Override + public ApplicationReport getApplicationReport(ApplicationId appId) throws YarnException, IOException { + return yarnClient.getApplicationReport(appId); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/c0d59139/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java index 812dac9..5b63364 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java @@ -33,10 +33,9 @@ import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.YarnApplicationState; -import org.apache.hadoop.yarn.client.api.YarnClient; -import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.tez.client.FrameworkClient; import org.apache.tez.client.TezClientUtils; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.DagTypeConverters; @@ -68,7 +67,7 @@ public class DAGClientRPCImpl implements DAGClient { private final TezConfiguration conf; @VisibleForTesting ApplicationReport appReport; - private YarnClient yarnClient; + private FrameworkClient frameworkClient; @VisibleForTesting DAGClientAMProtocolBlockingPB proxy = null; @@ -77,9 +76,9 @@ public class DAGClientRPCImpl implements DAGClient { this.appId = appId; this.dagId = dagId; this.conf = conf; - yarnClient = new YarnClientImpl(); - yarnClient.init(new YarnConfiguration(conf)); - yarnClient.start(); + frameworkClient = FrameworkClient.createFrameworkClient(); + frameworkClient.init(new YarnConfiguration(conf)); + frameworkClient.start(); appReport = null; } @@ -142,8 +141,8 @@ public class DAGClientRPCImpl implements DAGClient { if (this.proxy != null) { RPC.stopProxy(this.proxy); } - if(yarnClient != null) { - yarnClient.stop(); + if(frameworkClient != null) { + frameworkClient.stop(); } } @@ -190,7 +189,7 @@ public class DAGClientRPCImpl implements DAGClient { } ApplicationReport appReport; try { - appReport = yarnClient.getApplicationReport(appId); + appReport = frameworkClient.getApplicationReport(appId); } catch (YarnException e) { throw new TezException(e); } @@ -279,7 +278,7 @@ public class DAGClientRPCImpl implements DAGClient { ApplicationReport getAppReport() throws IOException, TezException { try { - ApplicationReport appReport = yarnClient.getApplicationReport(appId); + ApplicationReport appReport = frameworkClient.getApplicationReport(appId); if (LOG.isDebugEnabled()) { LOG.debug("App: " + appId + " in state: " + appReport.getYarnApplicationState()); http://git-wip-us.apache.org/repos/asf/tez/blob/c0d59139/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java index 9039bf4..0454fda 100644 --- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java +++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java @@ -61,7 +61,7 @@ import com.google.protobuf.RpcController; public class TestTezClient { class TezClientForTest extends TezClient { - YarnClient mockYarnClient; + TezYarnClient mockYarnClient; DAGClientAMProtocolBlockingPB sessionAmProxy; public TezClientForTest(String name, TezConfiguration tezConf, @@ -71,7 +71,7 @@ public class TestTezClient { } @Override - protected YarnClient createYarnClient() { + protected FrameworkClient createFrameworkClient() { return mockYarnClient; } @@ -112,7 +112,7 @@ public class TestTezClient { DAGClientAMProtocolBlockingPB sessionAmProxy = mock(DAGClientAMProtocolBlockingPB.class, RETURNS_DEEP_STUBS); client.sessionAmProxy = sessionAmProxy; - client.mockYarnClient = yarnClient; + client.mockYarnClient = new TezYarnClient(yarnClient); client.start(); verify(yarnClient, times(1)).init((Configuration)any());
