Repository: tez Updated Branches: refs/heads/master e8269c270 -> dbd763fd4
TEZ-3155. Support a way to submit DAGs to a session where the DAG plan exceeds hadoop ipc limits. (Zhiyuan Yang via hitesh) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/dbd763fd Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/dbd763fd Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/dbd763fd Branch: refs/heads/master Commit: dbd763fd479ccebf3988d23f3284fe1ec2f16d64 Parents: e8269c2 Author: Hitesh Shah <[email protected]> Authored: Wed Mar 9 17:41:18 2016 -0800 Committer: Hitesh Shah <[email protected]> Committed: Wed Mar 9 17:41:18 2016 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../java/org/apache/tez/client/TezClient.java | 35 ++++- .../apache/tez/dag/api/TezConfiguration.java | 10 ++ .../src/main/proto/DAGClientAMProtocol.proto | 1 + .../org/apache/tez/client/TestTezClient.java | 69 ++++++++++ .../tez/dag/api/client/DAGClientServer.java | 7 +- ...DAGClientAMProtocolBlockingPBServerImpl.java | 16 ++- .../org/apache/tez/dag/app/DAGAppMaster.java | 26 ++-- .../tez/dag/api/client/TestDAGClientServer.java | 3 +- ...DAGClientAMProtocolBlockingPBServerImpl.java | 135 +++++++++++++++++++ 10 files changed, 284 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/dbd763fd/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d01c732..3c72884 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES TEZ-3029. Add an onError method to service plugin contexts. ALL CHANGES: + TEZ-3155. Support a way to submit DAGs to a session where the DAG plan exceeds hadoop ipc limits. TEZ-2863. Container, node, and logs not available in UI for tasks that fail to launch TEZ-3140. Reduce AM memory usage during serialization TEZ-2756. MergeManager close should not try merging files on close if invoked after a shuffle exception. http://git-wip-us.apache.org/repos/asf/tez/blob/dbd763fd/tez-api/src/main/java/org/apache/tez/client/TezClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java index fc98d1a..be59e2f 100644 --- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java +++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java @@ -22,11 +22,17 @@ import java.io.IOException; import java.text.NumberFormat; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nullable; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.tez.common.JavaOptsChecker; import org.apache.tez.common.RPCUtil; +import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.counters.Limits; import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor; import org.slf4j.Logger; @@ -128,6 +134,12 @@ public class TezClient { private int preWarmDAGCounter = 0; + /* max submitDAG request size through IPC; beyond this we transfer them in the same way we transfer local resource */ + private int maxSubmitDAGRequestSizeThroughIPC; + /* this counter counts number of serialized DAGPlan and is used to give unique name to each serialized DAGPlan */ + private AtomicInteger serializedSubmitDAGPlanRequestCounter = new AtomicInteger(0); + private FileSystem stagingFs = null; + private static final String atsHistoryLoggingServiceClassName = "org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService"; private static final String atsHistoryACLManagerClassName = @@ -169,6 +181,10 @@ public class TezClient { this.amConfig = new AMConfiguration(tezConf, localResources, credentials); this.apiVersionInfo = new TezApiVersionInfo(); this.servicePluginsDescriptor = servicePluginsDescriptor; + this.maxSubmitDAGRequestSizeThroughIPC = tezConf.getInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH, + CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT) - + tezConf.getInt(TezConfiguration.TEZ_IPC_PAYLOAD_RESERVED_BYTES, + TezConfiguration.TEZ_IPC_PAYLOAD_RESERVED_BYTES_DEFAULT); Limits.setConfiguration(tezConf); LOG.info("Tez Client Version: " + apiVersionInfo.toString()); @@ -430,6 +446,8 @@ public class TezClient { } catch (YarnException e) { throw new TezException(e); } + + this.stagingFs = FileSystem.get(amConfig.getTezConfiguration()); } } @@ -507,7 +525,7 @@ public class TezClient { javaOptsChecker); SubmitDAGRequestProto.Builder requestBuilder = SubmitDAGRequestProto.newBuilder(); - requestBuilder.setDAGPlan(dagPlan).build(); + requestBuilder.setDAGPlan(dagPlan); if (!additionalLocalResources.isEmpty()) { requestBuilder.setAdditionalAmResources(DagTypeConverters .convertFromLocalResources(additionalLocalResources)); @@ -515,6 +533,19 @@ public class TezClient { additionalLocalResources.clear(); + // if request size exceeds maxSubmitDAGRequestSizeThroughIPC, we serialize them to HDFS + SubmitDAGRequestProto request = requestBuilder.build(); + if (request.getSerializedSize() > maxSubmitDAGRequestSizeThroughIPC) { + Path dagPlanPath = new Path(TezCommonUtils.getTezSystemStagingPath(amConfig.getTezConfiguration(), + sessionAppId.toString()), TezConstants.TEZ_PB_PLAN_BINARY_NAME + + serializedSubmitDAGPlanRequestCounter.incrementAndGet()); + + try (FSDataOutputStream fsDataOutputStream = stagingFs.create(dagPlanPath, false)) { + request.writeTo(fsDataOutputStream); + request = requestBuilder.clear().setSerializedRequestPath(stagingFs.resolvePath(dagPlanPath).toString()).build(); + } + } + DAGClientAMProtocolBlockingPB proxy = null; try { proxy = waitForProxy(); @@ -533,7 +564,7 @@ public class TezClient { } try { - SubmitDAGResponseProto response = proxy.submitDAG(null, requestBuilder.build()); + SubmitDAGResponseProto response = proxy.submitDAG(null, request); // the following check is only for testing since the final class // SubmitDAGResponseProto cannot be mocked if (response != null) { http://git-wip-us.apache.org/repos/asf/tez/blob/dbd763fd/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 221ac47..0221e6b 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -1528,6 +1528,16 @@ public class TezConfiguration extends Configuration { public static final String TEZ_CLIENT_ASYNCHRONOUS_STOP = TEZ_PREFIX + "client.asynchronous-stop"; public static final boolean TEZ_CLIENT_ASYNCHRONOUS_STOP_DEFAULT = true; + /** + * Int value. SubmitDAGPlanRequest cannot be larger than Max IPC message size minus this number; otherwise, it will + * be serialized to HDFS and we transfer the path to server. Server will deserialize the request from HDFS. + */ + @Private + @ConfigurationScope(Scope.CLIENT) + @ConfigurationProperty(type="int") + public static final String TEZ_IPC_PAYLOAD_RESERVED_BYTES = TEZ_PREFIX + "ipc.payload.reserved.bytes"; + public static final int TEZ_IPC_PAYLOAD_RESERVED_BYTES_DEFAULT = 5 * 1024 * 1024; + // for Recovery Test @Private @ConfigurationScope(Scope.TEST) http://git-wip-us.apache.org/repos/asf/tez/blob/dbd763fd/tez-api/src/main/proto/DAGClientAMProtocol.proto ---------------------------------------------------------------------- diff --git a/tez-api/src/main/proto/DAGClientAMProtocol.proto b/tez-api/src/main/proto/DAGClientAMProtocol.proto index a8171e7..113c9cc 100644 --- a/tez-api/src/main/proto/DAGClientAMProtocol.proto +++ b/tez-api/src/main/proto/DAGClientAMProtocol.proto @@ -63,6 +63,7 @@ message TryKillDAGResponseProto { message SubmitDAGRequestProto { optional DAGPlan d_a_g_plan = 1; optional PlanLocalResourcesProto additional_am_resources = 2; + optional string serializedRequestPath = 3; } message SubmitDAGResponseProto { http://git-wip-us.apache.org/repos/asf/tez/blob/dbd763fd/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java index c2531c6..a2e4956 100644 --- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java +++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java @@ -19,6 +19,8 @@ package org.apache.tez.client; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -42,6 +44,9 @@ import static org.mockito.Mockito.when; import com.google.protobuf.ServiceException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -64,6 +69,7 @@ import org.apache.tez.dag.api.SessionNotRunning; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB; @@ -155,6 +161,69 @@ public class TestTezClient { public void testTezclientSession() throws Exception { testTezClient(true); } + + @Test (timeout = 5000) + public void testTezClientSessionLargeDAGPlan() throws Exception { + // request size is within threshold of being serialized + _testTezClientSessionLargeDAGPlan(10*1024*1024, 10, 10, false); + // DAGPlan exceeds the threshold but is still less than max IPC size + _testTezClientSessionLargeDAGPlan(10*1024*1024, 6*1024*1024, 10, true); + // DAGPlan exceeds max IPC size + _testTezClientSessionLargeDAGPlan(10*1024*1024, 15*1024*1024, 10, true); + // amResources exceeds the threshold but is still less than max IPC size + _testTezClientSessionLargeDAGPlan(10*1024*1024, 10, 6*1024*1024, true); + // amResources exceeds max IPC size + _testTezClientSessionLargeDAGPlan(10*1024*1024, 10, 15*1024*1024, true); + // DAGPlan and amResources together exceed threshold but less than IPC size + _testTezClientSessionLargeDAGPlan(10*1024*1024, 3*1024*1024, 3*1024*1024, true); + // DAGPlan and amResources all exceed max IPC size + _testTezClientSessionLargeDAGPlan(10*1024*1024, 15*1024*1024, 15*1024*1024, true); + } + + private void _testTezClientSessionLargeDAGPlan(int maxIPCMsgSize, int payloadSize, int amResourceSize, + boolean shouldSerialize) throws Exception { + TezConfiguration conf = new TezConfiguration(); + conf.setInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH, maxIPCMsgSize); + conf.set(TezConfiguration.TEZ_AM_STAGING_DIR, "target/"+this.getClass().getName()); + TezClientForTest client = configureAndCreateTezClient(null, true, conf); + + Map<String, LocalResource> localResourceMap = new HashMap<>(); + byte[] bytes = new byte[amResourceSize]; + Arrays.fill(bytes, (byte)1); + String lrName = new String(bytes); + localResourceMap.put(lrName, LocalResource.newInstance(URL.newInstance("file", "localhost", 0, "/test"), + LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1)); + + ProcessorDescriptor processorDescriptor = ProcessorDescriptor.create("P"); + processorDescriptor.setUserPayload(UserPayload.create(ByteBuffer.allocate(payloadSize))); + Vertex vertex = Vertex.create("Vertex", processorDescriptor, 1, Resource.newInstance(1, 1)); + DAG dag = DAG.create("DAG").addVertex(vertex); + + client.start(); + client.addAppMasterLocalFiles(localResourceMap); + client.submitDAG(dag); + client.stop(); + + ArgumentCaptor<SubmitDAGRequestProto> captor = ArgumentCaptor.forClass(SubmitDAGRequestProto.class); + verify(client.sessionAmProxy).submitDAG((RpcController)any(), captor.capture()); + SubmitDAGRequestProto request = captor.getValue(); + + if (shouldSerialize) { + /* we need manually delete the serialized dagplan since staging path here won't be destroyed */ + Path dagPlanPath = new Path(request.getSerializedRequestPath()); + FileSystem fs = FileSystem.getLocal(conf); + fs.deleteOnExit(dagPlanPath); + fs.delete(dagPlanPath, false); + + assertTrue(request.hasSerializedRequestPath()); + assertFalse(request.hasDAGPlan()); + assertFalse(request.hasAdditionalAmResources()); + } else { + assertFalse(request.hasSerializedRequestPath()); + assertTrue(request.hasDAGPlan()); + assertTrue(request.hasAdditionalAmResources()); + } + } public void testTezClient(boolean isSession) throws Exception { Map<String, LocalResource> lrs = Maps.newHashMap(); http://git-wip-us.apache.org/repos/asf/tez/blob/dbd763fd/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java index 029761c..38f6740 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientServer.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import org.apache.hadoop.fs.FileSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -50,12 +51,14 @@ public class DAGClientServer extends AbstractService { DAGClientHandler realInstance; Server server; InetSocketAddress bindAddress; + final FileSystem stagingFs; public DAGClientServer(DAGClientHandler realInstance, - ApplicationAttemptId attemptId) { + ApplicationAttemptId attemptId, FileSystem stagingFs) { super("DAGClientRPCServer"); this.realInstance = realInstance; this.secretManager = new ClientToAMTokenSecretManager(attemptId, null); + this.stagingFs = stagingFs; } @Override @@ -65,7 +68,7 @@ public class DAGClientServer extends AbstractService { InetSocketAddress addr = new InetSocketAddress(0); DAGClientAMProtocolBlockingPBServerImpl service = - new DAGClientAMProtocolBlockingPBServerImpl(realInstance); + new DAGClientAMProtocolBlockingPBServerImpl(realInstance, stagingFs); BlockingService blockingService = DAGClientAMProtocol.newReflectiveBlockingService(service); http://git-wip-us.apache.org/repos/asf/tez/blob/dbd763fd/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java index fc6b267..32124b9 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java @@ -23,6 +23,9 @@ import java.security.AccessControlException; import java.util.List; import java.util.Map; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.tez.client.TezAppMasterStatus; @@ -55,9 +58,11 @@ import com.google.protobuf.ServiceException; public class DAGClientAMProtocolBlockingPBServerImpl implements DAGClientAMProtocolBlockingPB { DAGClientHandler real; + final FileSystem stagingFs; - public DAGClientAMProtocolBlockingPBServerImpl(DAGClientHandler real) { + public DAGClientAMProtocolBlockingPBServerImpl(DAGClientHandler real, FileSystem stagingFs) { this.real = real; + this.stagingFs = stagingFs; } private UserGroupInformation getRPCUser() throws ServiceException { @@ -152,6 +157,15 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements DAGClientAMProto throw new AccessControlException("User " + user + " cannot perform AM modify operation"); } try{ + if (request.hasSerializedRequestPath()) { + // need to deserialize large request from hdfs + Path requestPath = new Path(request.getSerializedRequestPath()); + try (FSDataInputStream fsDataInputStream = stagingFs.open(requestPath)) { + request = SubmitDAGRequestProto.parseFrom(fsDataInputStream); + } catch (IOException e) { + throw wrapException(e); + } + } DAGPlan dagPlan = request.getDAGPlan(); Map<String, LocalResource> additionalResources = null; if (request.hasAdditionalAmResources()) { http://git-wip-us.apache.org/repos/asf/tez/blob/dbd763fd/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 81a7791..eb9660c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -482,7 +482,19 @@ public class DAGAppMaster extends AbstractService { addIfService(dispatcher, false); - clientRpcServer = new DAGClientServer(clientHandler, appAttemptID); + recoveryDataDir = TezCommonUtils.getRecoveryPath(tezSystemStagingDir, conf); + recoveryFS = recoveryDataDir.getFileSystem(conf); + currentRecoveryDataDir = TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir, + appAttemptID.getAttemptId()); + if (LOG.isDebugEnabled()) { + LOG.debug("Stage directory information for AppAttemptId :" + this.appAttemptID + + " tezSystemStagingDir :" + tezSystemStagingDir + " recoveryDataDir :" + recoveryDataDir + + " recoveryAttemptDir :" + currentRecoveryDataDir); + } + recoveryEnabled = conf.getBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, + TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT); + + clientRpcServer = new DAGClientServer(clientHandler, appAttemptID, recoveryFS); addIfService(clientRpcServer, true); taskHeartbeatHandler = createTaskHeartbeatHandler(context, conf); @@ -589,18 +601,6 @@ public class DAGAppMaster extends AbstractService { TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS, TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS_DEFAULT); - recoveryDataDir = TezCommonUtils.getRecoveryPath(tezSystemStagingDir, conf); - recoveryFS = recoveryDataDir.getFileSystem(conf); - currentRecoveryDataDir = TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir, - appAttemptID.getAttemptId()); - if (LOG.isDebugEnabled()) { - LOG.debug("Stage directory information for AppAttemptId :" + this.appAttemptID - + " tezSystemStagingDir :" + tezSystemStagingDir + " recoveryDataDir :" + recoveryDataDir - + " recoveryAttemptDir :" + currentRecoveryDataDir); - } - recoveryEnabled = conf.getBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, - TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT); - if (!versionMismatch) { if (isSession) { FileInputStream sessionResourcesStream = null; http://git-wip-us.apache.org/repos/asf/tez/blob/dbd763fd/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientServer.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientServer.java b/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientServer.java index bf57cc1..06280d8 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientServer.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientServer.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.util.Random; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezUncheckedException; @@ -55,9 +56,9 @@ public class TestDAGClientServer { try { DAGClientHandler mockDAGClientHander = mock(DAGClientHandler.class); ApplicationAttemptId mockAppAttempId = mock(ApplicationAttemptId.class); - clientServer = new DAGClientServer(mockDAGClientHander, mockAppAttempId); Configuration conf = new Configuration(); conf.set(TezConfiguration.TEZ_AM_CLIENT_AM_PORT_RANGE, port + "-" + port); + clientServer = new DAGClientServer(mockDAGClientHander, mockAppAttempId, mock(FileSystem.class)); clientServer.init(conf); clientServer.start(); int resultedPort = clientServer.getBindAddress().getPort(); http://git-wip-us.apache.org/repos/asf/tez/blob/dbd763fd/tez-dag/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClientAMProtocolBlockingPBServerImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClientAMProtocolBlockingPBServerImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClientAMProtocolBlockingPBServerImpl.java new file mode 100644 index 0000000..3c030b3 --- /dev/null +++ b/tez-dag/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClientAMProtocolBlockingPBServerImpl.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.api.client.rpc; + +import java.io.File; +import java.io.FileOutputStream; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +import static junit.framework.TestCase.assertEquals; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; + +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.URL; +import org.apache.tez.common.security.ACLManager; +import org.apache.tez.dag.api.DAG; +import org.apache.tez.dag.api.DagTypeConverters; +import org.apache.tez.dag.api.ProcessorDescriptor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.dag.api.Vertex; +import org.apache.tez.dag.api.client.DAGClientHandler; +import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.SubmitDAGRequestProto; +import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.MockitoAnnotations; + +public class TestDAGClientAMProtocolBlockingPBServerImpl { + @Rule + public TemporaryFolder tmpFolder = new TemporaryFolder(new File("target")); + + @Captor + private ArgumentCaptor<Map<String, LocalResource>> localResourcesCaptor; + + @Before + public void init() { + MockitoAnnotations.initMocks(this); + } + + @Test(timeout = 5000) + @SuppressWarnings("unchecked") + public void testSubmitDagInSessionWithLargeDagPlan() throws Exception { + int maxIPCMsgSize = 1024; + String dagPlanName = "dagplan-name"; + File requestFile = tmpFolder.newFile("request-file"); + TezConfiguration conf = new TezConfiguration(); + conf.setInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH, maxIPCMsgSize); + + byte[] randomBytes = new byte[2*maxIPCMsgSize]; + (new Random()).nextBytes(randomBytes); + UserPayload payload = UserPayload.create(ByteBuffer.wrap(randomBytes)); + Vertex vertex = Vertex.create("V", ProcessorDescriptor.create("P").setUserPayload(payload), 1); + DAGPlan dagPlan = DAG.create(dagPlanName).addVertex(vertex).createDag(conf, null, null, null, false); + + String lrName = "localResource"; + String scheme = "file"; + String host = "localhost"; + int port = 80; + String path = "/test"; + URL lrURL = URL.newInstance(scheme, host, port, path); + LocalResource localResource = LocalResource.newInstance(lrURL, LocalResourceType.FILE, + LocalResourceVisibility.PUBLIC, 1, 1); + Map<String, LocalResource> localResources = new HashMap<>(); + localResources.put(lrName, localResource); + + SubmitDAGRequestProto.Builder requestBuilder = SubmitDAGRequestProto.newBuilder().setDAGPlan(dagPlan) + .setAdditionalAmResources(DagTypeConverters.convertFromLocalResources(localResources)); + try (FileOutputStream fileOutputStream = new FileOutputStream(requestFile)) { + requestBuilder.build().writeTo(fileOutputStream); + } + + DAGClientHandler dagClientHandler = mock(DAGClientHandler.class); + ACLManager aclManager = mock(ACLManager.class); + DAGClientAMProtocolBlockingPBServerImpl serverImpl = spy(new DAGClientAMProtocolBlockingPBServerImpl( + dagClientHandler, FileSystem.get(conf))); + when(dagClientHandler.getACLManager()).thenReturn(aclManager); + when(dagClientHandler.submitDAG((DAGPlan)any(), (Map<String, LocalResource>)any())).thenReturn("dag-id"); + when(aclManager.checkAMModifyAccess((UserGroupInformation) any())).thenReturn(true); + + requestBuilder.clear().setSerializedRequestPath(requestFile.getAbsolutePath()); + serverImpl.submitDAG(null, requestBuilder.build()); + + ArgumentCaptor<DAGPlan> dagPlanCaptor = ArgumentCaptor.forClass(DAGPlan.class); + verify(dagClientHandler).submitDAG(dagPlanCaptor.capture(), localResourcesCaptor.capture()); + dagPlan = dagPlanCaptor.getValue(); + localResources = localResourcesCaptor.getValue(); + + assertEquals(dagPlan.getName(), dagPlanName); + assertEquals(dagPlan.getVertexCount(), 1); + assertTrue(dagPlan.getSerializedSize() > maxIPCMsgSize); + assertArrayEquals(randomBytes, dagPlan.getVertex(0).getProcessorDescriptor().getTezUserPayload().getUserPayload(). + toByteArray()); + assertEquals(localResources.size(), 1); + assertTrue(localResources.containsKey(lrName)); + localResource = localResources.get(lrName); + assertEquals(localResource.getType(), LocalResourceType.FILE); + assertEquals(localResource.getVisibility(), LocalResourceVisibility.PUBLIC); + lrURL = localResource.getResource(); + assertEquals(lrURL.getScheme(), scheme); + assertEquals(lrURL.getHost(), host); + assertEquals(lrURL.getPort(), port); + assertEquals(lrURL.getFile(), path); + } +}
