http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-ext-service-tests/src/test/java/org/apache/tez/tests/ExternalTezServiceTestHelper.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/ExternalTezServiceTestHelper.java b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/ExternalTezServiceTestHelper.java new file mode 100644 index 0000000..14c19b5 --- /dev/null +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/ExternalTezServiceTestHelper.java @@ -0,0 +1,194 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.tests; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.Map; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.tez.client.TezClient; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.examples.HashJoinExample; +import org.apache.tez.examples.JoinDataGen; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; +import org.apache.tez.service.MiniTezTestServiceCluster; +import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor; +import org.apache.tez.test.MiniTezCluster; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ExternalTezServiceTestHelper { + + private static final Logger LOG = LoggerFactory.getLogger(ExternalTezServiceTestHelper.class); + + private volatile MiniTezCluster tezCluster; + private volatile MiniDFSCluster dfsCluster; + private volatile MiniTezTestServiceCluster tezTestServiceCluster; + + private volatile Configuration clusterConf = new Configuration(); + private volatile Configuration confForJobs; + + private volatile FileSystem remoteFs; + + private volatile TezClient sharedTezClient; + + /** + * Current usage: Create. setupSharedTezClient - during setup (beforeClass). Invoke tearDownAll when done (afterClass) + * Alternately tearDown the sharedTezClient independently + */ + public ExternalTezServiceTestHelper(String testRootDir) throws + IOException { + try { + clusterConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testRootDir); + dfsCluster = + new MiniDFSCluster.Builder(clusterConf).numDataNodes(1).format(true).racks(null).build(); + remoteFs = dfsCluster.getFileSystem(); + LOG.info("MiniDFSCluster started"); + } catch (IOException io) { + throw new RuntimeException("problem starting mini dfs cluster", io); + } + + tezCluster = new MiniTezCluster(TestExternalTezServices.class.getName(), 1, 1, 1); + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS + tezCluster.init(conf); + tezCluster.start(); + LOG.info("MiniTezCluster started"); + + clusterConf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS + for (Map.Entry<String, String> entry : tezCluster.getConfig()) { + clusterConf.set(entry.getKey(), entry.getValue()); + } + long jvmMax = Runtime.getRuntime().maxMemory(); + + tezTestServiceCluster = MiniTezTestServiceCluster + .create(TestExternalTezServices.class.getSimpleName(), 3, ((long) (jvmMax * 0.5d)), 1); + tezTestServiceCluster.init(clusterConf); + tezTestServiceCluster.start(); + LOG.info("MiniTezTestServer started"); + + confForJobs = new Configuration(clusterConf); + for (Map.Entry<String, String> entry : tezTestServiceCluster + .getClusterSpecificConfiguration()) { + confForJobs.set(entry.getKey(), entry.getValue()); + } + + Path stagingDirPath = new Path("/tmp/tez-staging-dir"); + remoteFs.mkdirs(stagingDirPath); + // This is currently configured to push tasks into the Service, and then use the standard RPC + confForJobs.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString()); + confForJobs.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, false); + } + + public void setupSharedTezClient(ServicePluginsDescriptor servicePluginsDescriptor) throws + IOException, TezException, InterruptedException { + // Create a session to use for all tests. + TezConfiguration tezClientConf = new TezConfiguration(confForJobs); + + sharedTezClient = TezClient + .newBuilder(TestExternalTezServices.class.getSimpleName() + "_session", tezClientConf) + .setIsSession(true).setServicePluginDescriptor(servicePluginsDescriptor).build(); + + sharedTezClient.start(); + LOG.info("Shared TezSession started"); + sharedTezClient.waitTillReady(); + LOG.info("Shared TezSession ready for submission"); + } + + public void tearDownAll() throws IOException, TezException { + if (sharedTezClient != null) { + sharedTezClient.stop(); + sharedTezClient = null; + } + + if (tezTestServiceCluster != null) { + tezTestServiceCluster.stop(); + tezTestServiceCluster = null; + } + + if (tezCluster != null) { + tezCluster.stop(); + tezCluster = null; + } + if (dfsCluster != null) { + dfsCluster.shutdown(); + dfsCluster = null; + } + } + + public void shutdownSharedTezClient() throws IOException, TezException { + if (sharedTezClient != null) { + sharedTezClient.stop(); + sharedTezClient = null; + } + } + + + public void setupHashJoinData(Path srcDataDir, Path dataPath1, Path dataPath2, + Path expectedResultPath, Path outputPath) throws + Exception { + remoteFs.mkdirs(srcDataDir); + TezConfiguration tezConf = new TezConfiguration(confForJobs); + // Generate join data - with 2 tasks. + JoinDataGen dataGen = new JoinDataGen(); + String[] dataGenArgs = new String[]{ + dataPath1.toString(), "1048576", dataPath2.toString(), "524288", + expectedResultPath.toString(), "2"}; + assertEquals(0, dataGen.run(tezConf, dataGenArgs, sharedTezClient)); + // Run the actual join - with 2 reducers + HashJoinExample joinExample = new HashJoinExample(); + String[] args = new String[]{ + dataPath1.toString(), dataPath2.toString(), "2", outputPath.toString()}; + assertEquals(0, joinExample.run(tezConf, args, sharedTezClient)); + LOG.info("Completed generating Data - Expected Hash Result and Actual Join Result"); + } + + + public MiniTezCluster getTezCluster() { + return tezCluster; + } + + public MiniDFSCluster getDfsCluster() { + return dfsCluster; + } + + public MiniTezTestServiceCluster getTezTestServiceCluster() { + return tezTestServiceCluster; + } + + public Configuration getClusterConf() { + return clusterConf; + } + + public Configuration getConfForJobs() { + return confForJobs; + } + + public FileSystem getRemoteFs() { + return remoteFs; + } + + public TezClient getSharedTezClient() { + Preconditions.checkNotNull(sharedTezClient); + return sharedTezClient; + } +}
http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java index 3701455..920534a 100644 --- a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java @@ -17,13 +17,8 @@ package org.apache.tez.tests; import static org.junit.Assert.assertEquals; import java.io.IOException; -import java.util.Map; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.tez.client.TezClient; import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.ProcessorDescriptor; @@ -37,18 +32,13 @@ import org.apache.tez.dag.api.client.DAGStatus; import org.apache.tez.dag.app.launcher.TezTestServiceNoOpContainerLauncher; import org.apache.tez.dag.app.rm.TezTestServiceTaskSchedulerService; import org.apache.tez.dag.app.taskcomm.TezTestServiceTaskCommunicatorImpl; -import org.apache.tez.examples.HashJoinExample; -import org.apache.tez.examples.JoinDataGen; import org.apache.tez.examples.JoinValidateConfigured; -import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.processor.SleepProcessor; -import org.apache.tez.service.MiniTezTestServiceCluster; import org.apache.tez.service.impl.ContainerRunnerImpl; import org.apache.tez.serviceplugins.api.ContainerLauncherDescriptor; import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor; import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor; import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor; -import org.apache.tez.test.MiniTezCluster; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -61,17 +51,7 @@ public class TestExternalTezServices { private static final String EXT_PUSH_ENTITY_NAME = "ExtServiceTestPush"; - private static volatile MiniTezCluster tezCluster; - private static volatile MiniDFSCluster dfsCluster; - private static volatile MiniTezTestServiceCluster tezTestServiceCluster; - - private static volatile Configuration clusterConf = new Configuration(); - private static volatile Configuration confForJobs; - - private static volatile FileSystem remoteFs; - private static volatile FileSystem localFs; - - private static volatile TezClient sharedTezClient; + private static ExternalTezServiceTestHelper extServiceTestHelper; private static final Path SRC_DATA_DIR = new Path("/tmp/" + TestExternalTezServices.class.getSimpleName()); private static final Path HASH_JOIN_EXPECTED_RESULT_PATH = new Path(SRC_DATA_DIR, "expectedOutputPath"); @@ -93,50 +73,8 @@ public class TestExternalTezServices { @BeforeClass public static void setup() throws Exception { - localFs = FileSystem.getLocal(clusterConf); - - try { - clusterConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR); - dfsCluster = - new MiniDFSCluster.Builder(clusterConf).numDataNodes(1).format(true).racks(null).build(); - remoteFs = dfsCluster.getFileSystem(); - LOG.info("MiniDFSCluster started"); - } catch (IOException io) { - throw new RuntimeException("problem starting mini dfs cluster", io); - } - - tezCluster = new MiniTezCluster(TestExternalTezServices.class.getName(), 1, 1, 1); - Configuration conf = new Configuration(); - conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS - tezCluster.init(conf); - tezCluster.start(); - LOG.info("MiniTezCluster started"); - - clusterConf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS - for (Map.Entry<String, String> entry : tezCluster.getConfig()) { - clusterConf.set(entry.getKey(), entry.getValue()); - } - long jvmMax = Runtime.getRuntime().maxMemory(); - - tezTestServiceCluster = MiniTezTestServiceCluster - .create(TestExternalTezServices.class.getSimpleName(), 3, ((long) (jvmMax * 0.5d)), 1); - tezTestServiceCluster.init(clusterConf); - tezTestServiceCluster.start(); - LOG.info("MiniTezTestServer started"); - - confForJobs = new Configuration(clusterConf); - for (Map.Entry<String, String> entry : tezTestServiceCluster - .getClusterSpecificConfiguration()) { - confForJobs.set(entry.getKey(), entry.getValue()); - } - - Path stagingDirPath = new Path("/tmp/tez-staging-dir"); - remoteFs.mkdirs(stagingDirPath); - // This is currently configured to push tasks into the Service, and then use the standard RPC - confForJobs.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString()); - confForJobs.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, false); - - UserPayload userPayload = TezUtils.createUserPayloadFromConf(confForJobs); + extServiceTestHelper = new ExternalTezServiceTestHelper(TEST_ROOT_DIR); + UserPayload userPayload = TezUtils.createUserPayloadFromConf(extServiceTestHelper.getConfForJobs()); TaskSchedulerDescriptor[] taskSchedulerDescriptors = new TaskSchedulerDescriptor[]{ TaskSchedulerDescriptor @@ -156,60 +94,21 @@ public class TestExternalTezServices { ServicePluginsDescriptor servicePluginsDescriptor = ServicePluginsDescriptor.create(true, true, taskSchedulerDescriptors, containerLauncherDescriptors, taskCommunicatorDescriptors); - // Create a session to use for all tests. - TezConfiguration tezClientConf = new TezConfiguration(confForJobs); - sharedTezClient = TezClient - .newBuilder(TestExternalTezServices.class.getSimpleName() + "_session", tezClientConf) - .setIsSession(true).setServicePluginDescriptor(servicePluginsDescriptor).build(); + extServiceTestHelper.setupSharedTezClient(servicePluginsDescriptor); - sharedTezClient.start(); - LOG.info("Shared TezSession started"); - sharedTezClient.waitTillReady(); - LOG.info("Shared TezSession ready for submission"); // Generate the join data set used for each run. // Can a timeout be enforced here ? - remoteFs.mkdirs(SRC_DATA_DIR); Path dataPath1 = new Path(SRC_DATA_DIR, "inPath1"); Path dataPath2 = new Path(SRC_DATA_DIR, "inPath2"); - TezConfiguration tezConf = new TezConfiguration(confForJobs); - // Generate join data - with 2 tasks. - JoinDataGen dataGen = new JoinDataGen(); - String[] dataGenArgs = new String[]{ - dataPath1.toString(), "1048576", dataPath2.toString(), "524288", - HASH_JOIN_EXPECTED_RESULT_PATH.toString(), "2"}; - assertEquals(0, dataGen.run(tezConf, dataGenArgs, sharedTezClient)); - // Run the actual join - with 2 reducers - HashJoinExample joinExample = new HashJoinExample(); - String[] args = new String[]{ - dataPath1.toString(), dataPath2.toString(), "2", HASH_JOIN_OUTPUT_PATH.toString()}; - assertEquals(0, joinExample.run(tezConf, args, sharedTezClient)); - - LOG.info("Completed generating Data - Expected Hash Result and Actual Join Result"); + extServiceTestHelper + .setupHashJoinData(SRC_DATA_DIR, dataPath1, dataPath2, HASH_JOIN_EXPECTED_RESULT_PATH, HASH_JOIN_OUTPUT_PATH); } @AfterClass public static void tearDown() throws IOException, TezException { - if (sharedTezClient != null) { - sharedTezClient.stop(); - sharedTezClient = null; - } - - if (tezTestServiceCluster != null) { - tezTestServiceCluster.stop(); - tezTestServiceCluster = null; - } - - if (tezCluster != null) { - tezCluster.stop(); - tezCluster = null; - } - if (dfsCluster != null) { - dfsCluster.shutdown(); - dfsCluster = null; - } - // TODO Add cleanup code. + extServiceTestHelper.tearDownAll(); } @@ -297,7 +196,7 @@ public class TestExternalTezServices { v.setExecutionContext(EXECUTION_CONTEXT_EXT_SERVICE_PUSH); dag.addVertex(v); - DAGClient dagClient = sharedTezClient.submitDAG(dag); + DAGClient dagClient = extServiceTestHelper.getSharedTezClient().submitDAG(dag); DAGStatus dagStatus = dagClient.waitForCompletion(); assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState()); assertEquals(1, dagStatus.getDAGProgress().getFailedTaskAttemptCount()); @@ -309,18 +208,18 @@ public class TestExternalTezServices { VertexExecutionContext rhsContext, VertexExecutionContext validateContext) throws Exception { - int externalSubmissionCount = tezTestServiceCluster.getNumSubmissions(); + int externalSubmissionCount = extServiceTestHelper.getTezTestServiceCluster().getNumSubmissions(); - TezConfiguration tezConf = new TezConfiguration(confForJobs); + TezConfiguration tezConf = new TezConfiguration(extServiceTestHelper.getConfForJobs()); JoinValidateConfigured joinValidate = new JoinValidateConfigured(EXECUTION_CONTEXT_DEFAULT, lhsContext, rhsContext, validateContext, name); String[] validateArgs = new String[]{"-disableSplitGrouping", HASH_JOIN_EXPECTED_RESULT_PATH.toString(), HASH_JOIN_OUTPUT_PATH.toString(), "3"}; - assertEquals(0, joinValidate.run(tezConf, validateArgs, sharedTezClient)); + assertEquals(0, joinValidate.run(tezConf, validateArgs, extServiceTestHelper.getSharedTezClient())); // Ensure this was actually submitted to the external cluster assertEquals(extExpectedCount, - (tezTestServiceCluster.getNumSubmissions() - externalSubmissionCount)); + (extServiceTestHelper.getTezTestServiceCluster().getNumSubmissions() - externalSubmissionCount)); } } http://git-wip-us.apache.org/repos/asf/tez/blob/1d765431/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServicesErrors.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServicesErrors.java b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServicesErrors.java new file mode 100644 index 0000000..bfd3ed2 --- /dev/null +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServicesErrors.java @@ -0,0 +1,235 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.tests; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.EnumSet; + +import com.google.common.collect.Sets; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.tez.client.TezClient; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.DAG; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.dag.api.Vertex; +import org.apache.tez.dag.api.client.DAGClient; +import org.apache.tez.dag.api.client.DAGStatus; +import org.apache.tez.dag.api.client.StatusGetOpts; +import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType; +import org.apache.tez.dag.app.launcher.TezTestServiceContainerLauncherWithErrors; +import org.apache.tez.dag.app.launcher.TezTestServiceNoOpContainerLauncher; +import org.apache.tez.dag.app.rm.TezTestServiceTaskSchedulerService; +import org.apache.tez.dag.app.rm.TezTestServiceTaskSchedulerServiceWithErrors; +import org.apache.tez.dag.app.taskcomm.TezTestServiceTaskCommunicatorImpl; +import org.apache.tez.dag.app.taskcomm.TezTestServiceTaskCommunicatorWithErrors; +import org.apache.tez.examples.JoinValidateConfigured; +import org.apache.tez.serviceplugins.api.ContainerLauncherDescriptor; +import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor; +import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor; +import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestExternalTezServicesErrors { + + private static final Logger LOG = LoggerFactory.getLogger(TestExternalTezServicesErrors.class); + + private static final String EXT_PUSH_ENTITY_NAME = "ExtServiceTestPush"; + private static final String EXT_FAIL_ENTITY_NAME = "ExtServiceTestFail"; + + private static ExternalTezServiceTestHelper extServiceTestHelper; + + private static ServicePluginsDescriptor servicePluginsDescriptor; + + private static final Path SRC_DATA_DIR = new Path("/tmp/" + TestExternalTezServicesErrors.class.getSimpleName()); + private static final Path HASH_JOIN_EXPECTED_RESULT_PATH = new Path(SRC_DATA_DIR, "expectedOutputPath"); + private static final Path HASH_JOIN_OUTPUT_PATH = new Path(SRC_DATA_DIR, "outPath"); + + private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_EXT_SERVICE_PUSH = + Vertex.VertexExecutionContext.create( + EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME); + private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_LAUNCHER_FAIL = + Vertex.VertexExecutionContext.create(EXT_PUSH_ENTITY_NAME, EXT_FAIL_ENTITY_NAME, EXT_PUSH_ENTITY_NAME); + private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_TASKCOMM_FAIL = + Vertex.VertexExecutionContext.create(EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, EXT_FAIL_ENTITY_NAME); + private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_SCHEDULER_FAIL = + Vertex.VertexExecutionContext.create(EXT_FAIL_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME); + + + private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_DEFAULT = EXECUTION_CONTEXT_EXT_SERVICE_PUSH; + + private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR + TestExternalTezServicesErrors.class.getName() + + "-tmpDir"; + + @BeforeClass + public static void setup() throws Exception { + + extServiceTestHelper = new ExternalTezServiceTestHelper(TEST_ROOT_DIR); + UserPayload userPayload = TezUtils.createUserPayloadFromConf(extServiceTestHelper.getConfForJobs()); + + TaskSchedulerDescriptor[] taskSchedulerDescriptors = new TaskSchedulerDescriptor[]{ + TaskSchedulerDescriptor + .create(EXT_PUSH_ENTITY_NAME, TezTestServiceTaskSchedulerService.class.getName()) + .setUserPayload(userPayload), + TaskSchedulerDescriptor.create(EXT_FAIL_ENTITY_NAME, + TezTestServiceTaskSchedulerServiceWithErrors.class.getName()).setUserPayload( + userPayload)}; + + ContainerLauncherDescriptor[] containerLauncherDescriptors = new ContainerLauncherDescriptor[]{ + ContainerLauncherDescriptor + .create(EXT_PUSH_ENTITY_NAME, TezTestServiceNoOpContainerLauncher.class.getName()) + .setUserPayload(userPayload), + ContainerLauncherDescriptor.create(EXT_FAIL_ENTITY_NAME, + TezTestServiceContainerLauncherWithErrors.class.getName()).setUserPayload(userPayload)}; + + TaskCommunicatorDescriptor[] taskCommunicatorDescriptors = new TaskCommunicatorDescriptor[]{ + TaskCommunicatorDescriptor + .create(EXT_PUSH_ENTITY_NAME, TezTestServiceTaskCommunicatorImpl.class.getName()) + .setUserPayload(userPayload), + TaskCommunicatorDescriptor.create(EXT_FAIL_ENTITY_NAME, + TezTestServiceTaskCommunicatorWithErrors.class.getName()).setUserPayload(userPayload)}; + + servicePluginsDescriptor = ServicePluginsDescriptor.create(true, true, + taskSchedulerDescriptors, containerLauncherDescriptors, taskCommunicatorDescriptors); + + extServiceTestHelper.setupSharedTezClient(servicePluginsDescriptor); + + // Generate the join data set used for each run. + // Can a timeout be enforced here ? + Path dataPath1 = new Path(SRC_DATA_DIR, "inPath1"); + Path dataPath2 = new Path(SRC_DATA_DIR, "inPath2"); + extServiceTestHelper + .setupHashJoinData(SRC_DATA_DIR, dataPath1, dataPath2, HASH_JOIN_EXPECTED_RESULT_PATH, HASH_JOIN_OUTPUT_PATH); + + extServiceTestHelper.shutdownSharedTezClient(); + } + + @AfterClass + public static void tearDown() throws IOException, TezException { + extServiceTestHelper.tearDownAll(); + } + + @Test (timeout = 90000) + public void testContainerLauncherError() throws Exception { + testServiceError("_testContainerLauncherError_", EXECUTION_CONTEXT_LAUNCHER_FAIL, + DAGAppMasterEventType.CONTAINER_LAUNCHER_SERVICE_FATAL_ERROR); + } + + @Test (timeout = 90000) + public void testTaskCommunicatorError() throws Exception { + testServiceError("_testTaskCommunicatorError_", EXECUTION_CONTEXT_TASKCOMM_FAIL, + DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR); + } + + @Test (timeout = 90000) + public void testTaskSchedulerError() throws Exception { + testServiceError("_testTaskSchedulerError_", EXECUTION_CONTEXT_SCHEDULER_FAIL, + DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR); + } + + private void testServiceError(String methodName, + Vertex.VertexExecutionContext lhsExecutionContext, + DAGAppMasterEventType expectedEventType) throws + IOException, TezException, InterruptedException, YarnException { + TezConfiguration tezClientConf = new TezConfiguration(extServiceTestHelper.getConfForJobs()); + TezClient tezClient = TezClient + .newBuilder(TestExternalTezServicesErrors.class.getSimpleName() + methodName + "_session", + tezClientConf) + .setIsSession(true).setServicePluginDescriptor(servicePluginsDescriptor).build(); + + ApplicationId appId; + try { + tezClient.start(); + LOG.info("TezSessionStarted for " + methodName); + tezClient.waitTillReady(); + LOG.info("TezSession ready for submission for " + methodName); + + JoinValidateConfigured joinValidate = + new JoinValidateConfigured(EXECUTION_CONTEXT_DEFAULT, lhsExecutionContext, + EXECUTION_CONTEXT_EXT_SERVICE_PUSH, + EXECUTION_CONTEXT_EXT_SERVICE_PUSH, "LauncherFailTest"); + + DAG dag = joinValidate + .createDag(new TezConfiguration(extServiceTestHelper.getConfForJobs()), HASH_JOIN_EXPECTED_RESULT_PATH, + HASH_JOIN_OUTPUT_PATH, 3); + + DAGClient dagClient = tezClient.submitDAG(dag); + + DAGStatus dagStatus = + dagClient.waitForCompletionWithStatusUpdates(Sets.newHashSet(StatusGetOpts.GET_COUNTERS)); + assertEquals(DAGStatus.State.ERROR, dagStatus.getState()); + boolean foundDiag = false; + for (String diag : dagStatus.getDiagnostics()) { + if (diag.contains("Service Error") && diag.contains( + expectedEventType.toString()) && + diag.contains("Simulated Error")) { + foundDiag = true; + } + } + appId = tezClient.getAppMasterApplicationId(); + assertTrue(foundDiag); + } finally { + tezClient.stop(); + } + // Verify the state of the application. + if (appId != null) { + YarnClient yarnClient = YarnClient.createYarnClient(); + try { + yarnClient.init(tezClientConf); + yarnClient.start(); + + ApplicationReport appReport = yarnClient.getApplicationReport(appId); + YarnApplicationState appState = appReport.getYarnApplicationState(); + while (!EnumSet.of(YarnApplicationState.FINISHED, YarnApplicationState.FAILED, + YarnApplicationState.KILLED).contains(appState)) { + Thread.sleep(200L); + appReport = yarnClient.getApplicationReport(appId); + appState = appReport.getYarnApplicationState(); + } + + // TODO Workaround for YARN-4554. AppReport does not provide diagnostics - need to fetch them from ApplicationAttemptReport + ApplicationAttemptId appAttemptId = appReport.getCurrentApplicationAttemptId(); + ApplicationAttemptReport appAttemptReport = + yarnClient.getApplicationAttemptReport(appAttemptId); + String diag = appAttemptReport.getDiagnostics(); + assertEquals(FinalApplicationStatus.FAILED, appReport.getFinalApplicationStatus()); + assertEquals(YarnApplicationState.FINISHED, appReport.getYarnApplicationState()); + assertTrue(diag.contains("Service Error") && diag.contains( + expectedEventType.toString()) && + diag.contains("Simulated Error")); + + } finally { + yarnClient.stop(); + } + } + } + +}
