Repository: tez Updated Branches: refs/heads/master c547dcc80 -> 9c1d8ceed
TEZ-3108. Add support for external services to local mode. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/9c1d8cee Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/9c1d8cee Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/9c1d8cee Branch: refs/heads/master Commit: 9c1d8ceed9eb31a3631e30dc24e41843ab00e8fc Parents: c547dcc Author: Siddharth Seth <[email protected]> Authored: Wed Mar 30 10:21:31 2016 -0700 Committer: Siddharth Seth <[email protected]> Committed: Wed Mar 30 10:21:31 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + docs/src/site/markdown/localmode.md | 9 + .../java/org/apache/tez/client/LocalClient.java | 43 +++- .../org/apache/tez/dag/app/DAGAppMaster.java | 11 +- .../app/launcher/ContainerLauncherManager.java | 8 +- .../app/launcher/LocalContainerLauncher.java | 12 +- .../tez/dag/app/rm/TaskSchedulerManager.java | 17 +- .../tez/tests/TestExtServicesWithLocalMode.java | 206 +++++++++++++++++++ 8 files changed, 282 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/9c1d8cee/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index f9d0166..830d7fa 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES TEZ-3029. Add an onError method to service plugin contexts. ALL CHANGES: + TEZ-3108. Add support for external services to local mode. TEZ-3189. Pre-warm dags should not be counted in submitted dags count by DAGAppMaster. TEZ-2967. Vertex start time should be that of first task start time in UI TEZ-3175. Add tez client submit host http://git-wip-us.apache.org/repos/asf/tez/blob/9c1d8cee/docs/src/site/markdown/localmode.md ---------------------------------------------------------------------- diff --git a/docs/src/site/markdown/localmode.md b/docs/src/site/markdown/localmode.md index ae546a1..26968da 100644 --- a/docs/src/site/markdown/localmode.md +++ b/docs/src/site/markdown/localmode.md @@ -105,3 +105,12 @@ Potential pitfalls when moving from Local Mode to a real cluster - The ObjectRegistry will work within a single task, when running in Local Mode. The behaviour would be different on a real cluster, where it would work across tasks which share the same container. + +Local Mode with External Services + +- When running in local mode, regular container execution is converted + to run within the same process, instead of launching containers. +- Execution that is configured to run in external services is unaffected, + and an attempt is made to make use of these external services for execution. + If configured in this manner, make sure that the external services are running + when attempting to use local mode. http://git-wip-us.apache.org/repos/asf/tez/blob/9c1d8cee/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java index b225523..474f4ca 100644 --- a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java +++ b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java @@ -18,11 +18,13 @@ package org.apache.tez.client; +import java.io.File; import java.io.IOException; import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.List; +import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; @@ -44,9 +46,12 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.tez.common.TezCommonUtils; +import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.client.DAGClientHandler; +import org.apache.tez.dag.api.records.DAGProtos; +import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.DAGAppMaster; import org.apache.tez.dag.app.DAGAppMasterState; @@ -333,16 +338,44 @@ public class LocalClient extends FrameworkClient { return thread; } - + // this can be overridden by test code to create a mock app @VisibleForTesting protected DAGAppMaster createDAGAppMaster(ApplicationAttemptId applicationAttemptId, - ContainerId cId, String currentHost, int nmPort, int nmHttpPort, - Clock clock, long appSubmitTime, boolean isSession, String userDir, - String[] localDirs, String[] logDirs, Credentials credentials, String jobUserName) { + ContainerId cId, String currentHost, int nmPort, + int nmHttpPort, + Clock clock, long appSubmitTime, boolean isSession, + String userDir, + String[] localDirs, String[] logDirs, + Credentials credentials, String jobUserName) throws + IOException { + + // Read in additional information about external services + AMPluginDescriptorProto amPluginDescriptorProto = + getPluginDescriptorInfo(conf, applicationAttemptId.getApplicationId().toString()); + + return new DAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort, new SystemClock(), appSubmitTime, isSession, userDir, localDirs, logDirs, - versionInfo.getVersion(), 1, credentials, jobUserName, null); + versionInfo.getVersion(), 1, credentials, jobUserName, amPluginDescriptorProto); + } + + private AMPluginDescriptorProto getPluginDescriptorInfo(Configuration conf, + String applicationIdString) throws + IOException { + Path tezSysStagingPath = TezCommonUtils + .getTezSystemStagingPath(conf, applicationIdString); + // Remove the filesystem qualifier. + String unqualifiedPath = tezSysStagingPath.toUri().getPath(); + + DAGProtos.ConfigurationProto confProto = + TezUtilsInternal + .readUserSpecifiedTezConfiguration(unqualifiedPath); + AMPluginDescriptorProto amPluginDescriptorProto = null; + if (confProto.hasAmPluginDescriptor()) { + amPluginDescriptorProto = confProto.getAmPluginDescriptor(); + } + return amPluginDescriptorProto; } } http://git-wip-us.apache.org/repos/asf/tez/blob/9c1d8cee/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 484ae77..7d28497 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -435,7 +435,6 @@ public class DAGAppMaster extends AbstractService { isLocal, defaultPayload); - LOG.info(buildPluginComponentLog(taskSchedulerDescriptors, taskSchedulers, "TaskSchedulers")); LOG.info(buildPluginComponentLog(containerLauncherDescriptors, containerLaunchers, "ContainerLaunchers")); LOG.info(buildPluginComponentLog(taskCommunicatorDescriptors, taskCommunicators, "TaskCommunicators")); @@ -2638,8 +2637,14 @@ public class DAGAppMaster extends AbstractService { UserPayload defaultPayload, BiMap<String, Integer> schedulerPluginMap) { if (isLocal) { - Preconditions.checkState(descriptors.size() == 1 && - descriptors.get(0).getEntityName().equals(TezConstants.getTezUberServicePluginName())); + boolean foundUberServiceName = false; + for (NamedEntityDescriptor descriptor : descriptors) { + if (descriptor.getEntityName().equals(TezConstants.getTezUberServicePluginName())) { + foundUberServiceName = true; + break; + } + } + Preconditions.checkState(foundUberServiceName); } else { boolean foundYarn = false; for (int i = 0; i < descriptors.size(); i++) { http://git-wip-us.apache.org/repos/asf/tez/blob/9c1d8cee/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java index 250afd8..1f5151b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java @@ -69,7 +69,7 @@ public class ContainerLauncherManager extends AbstractService TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, String workingDirectory, List<NamedEntityDescriptor> containerLauncherDescriptors, - boolean isPureLocalMode) throws TezException { + boolean isLocalMode) throws TezException { super(ContainerLauncherManager.class.getName()); this.isIncompleteCtor = false; @@ -88,7 +88,7 @@ public class ContainerLauncherManager extends AbstractService new ContainerLauncherContextImpl(context, this, taskCommunicatorManagerInterface, userPayload, i); containerLauncherContexts[i] = containerLauncherContext; containerLaunchers[i] = new ContainerLauncherWrapper(createContainerLauncher(containerLauncherDescriptors.get(i), context, - containerLauncherContext, taskCommunicatorManagerInterface, workingDirectory, i, isPureLocalMode)); + containerLauncherContext, taskCommunicatorManagerInterface, workingDirectory, i, isLocalMode)); containerLauncherServiceWrappers[i] = new ServicePluginLifecycleAbstractService<>(containerLaunchers[i].getContainerLauncher()); } } @@ -145,7 +145,7 @@ public class ContainerLauncherManager extends AbstractService AppContext context, TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, String workingDirectory, - boolean isPureLocalMode) { + boolean isLocalMode) { LOG.info("Creating LocalContainerLauncher"); // TODO Post TEZ-2003. LocalContainerLauncher is special cased, since it makes use of // extensive internals which are only available at runtime. Will likely require @@ -154,7 +154,7 @@ public class ContainerLauncherManager extends AbstractService return new LocalContainerLauncher(containerLauncherContext, context, taskCommunicatorManagerInterface, - workingDirectory, isPureLocalMode); + workingDirectory, isLocalMode); } catch (UnknownHostException e) { throw new TezUncheckedException(e); } http://git-wip-us.apache.org/repos/asf/tez/blob/9c1d8cee/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java index b737fda..1e9d1e6 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java @@ -88,7 +88,7 @@ public class LocalContainerLauncher extends ContainerLauncher { private final Map<String, String> localEnv; private final ExecutionContext executionContext; private final int numExecutors; - private final boolean isPureLocalMode; + private final boolean isLocalMode; private final ConcurrentHashMap<ContainerId, RunningTaskCallback> runningContainers = @@ -108,7 +108,7 @@ public class LocalContainerLauncher extends ContainerLauncher { AppContext context, TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, String workingDirectory, - boolean isPureLocalMode) throws UnknownHostException { + boolean isLocalMode) throws UnknownHostException { // TODO Post TEZ-2003. Most of this information is dynamic and only available after the AM // starts up. It's not possible to set these up via a static payload. // Will need some kind of mechanism to dynamically crate payloads / bind to parameters @@ -117,8 +117,8 @@ public class LocalContainerLauncher extends ContainerLauncher { this.context = context; this.tal = taskCommunicatorManagerInterface; this.workingDirectory = workingDirectory; - this.isPureLocalMode = isPureLocalMode; - if (isPureLocalMode) { + this.isLocalMode = isLocalMode; + if (isLocalMode) { localEnv = Maps.newHashMap(); AuxiliaryServiceHelper.setServiceDataIntoEnv( ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, ByteBuffer.allocate(4).putInt(0), localEnv); @@ -127,7 +127,7 @@ public class LocalContainerLauncher extends ContainerLauncher { } // Check if the hostname is set in the environment before overriding it. - String host = isPureLocalMode ? InetAddress.getLocalHost().getHostName() : + String host = isLocalMode ? InetAddress.getLocalHost().getHostName() : System.getenv(Environment.NM_HOST.name()); executionContext = new ExecutionContextImpl(host); @@ -347,7 +347,7 @@ public class LocalContainerLauncher extends ContainerLauncher { Map<String, String> containerEnv = new HashMap<String, String>(); containerEnv.putAll(localEnv); // Use the user from env if it's available. - String user = isPureLocalMode ? System.getenv(Environment.USER.name()) : context.getUser(); + String user = isLocalMode ? System.getenv(Environment.USER.name()) : context.getUser(); containerEnv.put(Environment.USER.name(), user); long memAvailable; http://git-wip-us.apache.org/repos/asf/tez/blob/9c1d8cee/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java index 5317440..16f9a28 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java @@ -124,7 +124,7 @@ public class TaskSchedulerManager extends AbstractService implements @VisibleForTesting final ExecutorService appCallbackExecutor; - private final boolean isPureLocalMode; + private final boolean isLocalMode; // If running in non local-only mode, the YARN task scheduler will always run to take care of // registration with YARN and heartbeats to YARN. // Splitting registration and heartbeats is not straight-forward due to the taskScheduler being @@ -159,7 +159,7 @@ public class TaskSchedulerManager extends AbstractService implements this.taskSchedulerDescriptors = null; this.webUI = null; this.historyUrl = null; - this.isPureLocalMode = false; + this.isLocalMode = false; } /** @@ -171,7 +171,7 @@ public class TaskSchedulerManager extends AbstractService implements * @param webUI * @param schedulerDescriptors the list of scheduler descriptors. Tez internal classes will not have the class names populated. * An empty list defaults to using the YarnTaskScheduler as the only source. - * @param isPureLocalMode whether the AM is running in local mode + * @param isLocalMode whether the AM is running in local mode */ @SuppressWarnings("rawtypes") public TaskSchedulerManager(AppContext appContext, @@ -179,7 +179,7 @@ public class TaskSchedulerManager extends AbstractService implements ContainerSignatureMatcher containerSignatureMatcher, WebUIService webUI, List<NamedEntityDescriptor> schedulerDescriptors, - boolean isPureLocalMode) { + boolean isLocalMode) { super(TaskSchedulerManager.class.getName()); Preconditions.checkArgument(schedulerDescriptors != null && !schedulerDescriptors.isEmpty(), "TaskSchedulerDescriptors must be specified"); @@ -189,7 +189,7 @@ public class TaskSchedulerManager extends AbstractService implements this.containerSignatureMatcher = containerSignatureMatcher; this.webUI = webUI; this.historyUrl = getHistoryUrl(); - this.isPureLocalMode = isPureLocalMode; + this.isLocalMode = isLocalMode; this.appCallbackExecutor = createAppCallbackExecutorService(); if (this.webUI != null) { this.webUI.setHistoryUrl(this.historyUrl); @@ -579,8 +579,11 @@ public class TaskSchedulerManager extends AbstractService implements int j = 0; for (int i = 0; i < taskSchedulerDescriptors.length; i++) { long customAppIdIdentifier; - if (isPureLocalMode || taskSchedulerDescriptors[i].getEntityName().equals( - TezConstants.getTezYarnServicePluginName())) { // Use the app identifier from the appId. + if ((isLocalMode && taskSchedulerDescriptors[i].getEntityName() + .equals(TezConstants.getTezUberServicePluginName()) || + taskSchedulerDescriptors[i].getEntityName() + .equals(TezConstants.getTezYarnServicePluginName()))) { + // Use the provided appId instead of constructing one for containers. customAppIdIdentifier = appContext.getApplicationID().getClusterTimestamp(); } else { customAppIdIdentifier = SCHEDULER_APP_ID_BASE + (j++ * SCHEDULER_APP_ID_INCREMENT); http://git-wip-us.apache.org/repos/asf/tez/blob/9c1d8cee/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExtServicesWithLocalMode.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExtServicesWithLocalMode.java b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExtServicesWithLocalMode.java new file mode 100644 index 0000000..3d8c087 --- /dev/null +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExtServicesWithLocalMode.java @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.tests; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tez.client.TezClient; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.dag.api.Vertex; +import org.apache.tez.dag.app.launcher.TezTestServiceNoOpContainerLauncher; +import org.apache.tez.dag.app.rm.TezTestServiceTaskSchedulerService; +import org.apache.tez.dag.app.taskcomm.TezTestServiceTaskCommunicatorImpl; +import org.apache.tez.examples.HashJoinExample; +import org.apache.tez.examples.JoinDataGen; +import org.apache.tez.examples.JoinValidateConfigured; +import org.apache.tez.service.MiniTezTestServiceCluster; +import org.apache.tez.serviceplugins.api.ContainerLauncherDescriptor; +import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor; +import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor; +import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestExtServicesWithLocalMode { + + private static final Logger LOG = LoggerFactory.getLogger(TestExtServicesWithLocalMode.class); + + + private static final String EXT_PUSH_ENTITY_NAME = "ExtServiceTestPush"; + + private static String TEST_ROOT_DIR = + "target" + Path.SEPARATOR + TestExtServicesWithLocalMode.class.getName() + + "-tmpDir"; + + private static final Path SRC_DATA_DIR = new Path(TEST_ROOT_DIR + Path.SEPARATOR + "data"); + private static final Path HASH_JOIN_EXPECTED_RESULT_PATH = + new Path(SRC_DATA_DIR, "expectedOutputPath"); + private static final Path HASH_JOIN_OUTPUT_PATH = new Path(SRC_DATA_DIR, "outPath"); + + private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_EXT_SERVICE_PUSH = + Vertex.VertexExecutionContext.create( + EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME); + + private static volatile Configuration clusterConf = new Configuration(); + private static volatile FileSystem localFs; + private static volatile MiniTezTestServiceCluster tezTestServiceCluster; + + private static volatile Configuration confForJobs; + + @BeforeClass + public static void setup() throws Exception { + + localFs = FileSystem.getLocal(clusterConf).getRaw(); + long jvmMax = Runtime.getRuntime().maxMemory(); + tezTestServiceCluster = MiniTezTestServiceCluster + .create(TestExternalTezServices.class.getSimpleName(), 3, ((long) (jvmMax * 0.5d)), 1); + tezTestServiceCluster.init(clusterConf); + tezTestServiceCluster.start(); + LOG.info("MiniTezTestServer started"); + + confForJobs = new Configuration(clusterConf); + for (Map.Entry<String, String> entry : tezTestServiceCluster + .getClusterSpecificConfiguration()) { + confForJobs.set(entry.getKey(), entry.getValue()); + } + confForJobs.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true); + } + + @AfterClass + public static void tearDown() throws IOException, TezException { + if (tezTestServiceCluster != null) { + tezTestServiceCluster.stop(); + tezTestServiceCluster = null; + } + + Path testRootDirPath = new Path(TEST_ROOT_DIR); + testRootDirPath = localFs.makeQualified(testRootDirPath); + LOG.info("CLeaning up path: " + testRootDirPath); + localFs.delete(testRootDirPath, true); + } + + + @Test(timeout = 30000) + public void test1() throws Exception { + + UserPayload userPayload = TezUtils.createUserPayloadFromConf(confForJobs); + + TaskSchedulerDescriptor[] taskSchedulerDescriptors = new TaskSchedulerDescriptor[]{ + TaskSchedulerDescriptor + .create(EXT_PUSH_ENTITY_NAME, TezTestServiceTaskSchedulerService.class.getName()) + .setUserPayload(userPayload)}; + + ContainerLauncherDescriptor[] containerLauncherDescriptors = new ContainerLauncherDescriptor[]{ + ContainerLauncherDescriptor + .create(EXT_PUSH_ENTITY_NAME, TezTestServiceNoOpContainerLauncher.class.getName()) + .setUserPayload(userPayload)}; + + TaskCommunicatorDescriptor[] taskCommunicatorDescriptors = new TaskCommunicatorDescriptor[]{ + TaskCommunicatorDescriptor + .create(EXT_PUSH_ENTITY_NAME, TezTestServiceTaskCommunicatorImpl.class.getName()) + .setUserPayload(userPayload)}; + + ServicePluginsDescriptor servicePluginsDescriptor = ServicePluginsDescriptor.create(true, false, + taskSchedulerDescriptors, containerLauncherDescriptors, taskCommunicatorDescriptors); + + + TezConfiguration tezConf = new TezConfiguration(confForJobs); + + TezClient tezClient = TezClient.newBuilder("test1", tezConf).setIsSession(true) + .setServicePluginDescriptor(servicePluginsDescriptor).build(); + try { + tezClient.start(); + + + Path dataPath1 = new Path(SRC_DATA_DIR, "inPath1"); + Path dataPath2 = new Path(SRC_DATA_DIR, "inPath2"); + + Path expectedResultPath = new Path(SRC_DATA_DIR, "expectedOutputPath"); + + + JoinDataGen dataGen = new JoinDataGen(); + String[] dataGenArgs = new String[]{ + dataPath1.toString(), "1048576", dataPath2.toString(), "524288", + expectedResultPath.toString(), "2"}; + + assertEquals(0, dataGen.run(tezConf, dataGenArgs, tezClient)); + + Path outputPath = new Path(SRC_DATA_DIR, "outPath"); + HashJoinExample joinExample = new HashJoinExample(); + String[] args = new String[]{ + dataPath1.toString(), dataPath2.toString(), "2", outputPath.toString()}; + assertEquals(0, joinExample.run(tezConf, args, tezClient)); + LOG.info("Completed generating Data - Expected Hash Result and Actual Join Result"); + + assertEquals(0, tezTestServiceCluster.getNumSubmissions()); + + // ext can consume from ext. + runJoinValidate(tezClient, "allInExt", 7, EXECUTION_CONTEXT_EXT_SERVICE_PUSH, + EXECUTION_CONTEXT_EXT_SERVICE_PUSH, EXECUTION_CONTEXT_EXT_SERVICE_PUSH); + LOG.info("Completed allInExt"); + + // uber can consume from uber. + runJoinValidate(tezClient, "noneInExt", 0, null, null, null); + LOG.info("Completed noneInExt"); + + // uber can consume from ext + runJoinValidate(tezClient, "lhsInExt", 2, EXECUTION_CONTEXT_EXT_SERVICE_PUSH, null, null); + LOG.info("Completed lhsInExt"); + + // ext cannot consume from uber in this mode since there's no shuffle handler working, + // and the local data transfer semantics may not match. + + } finally { + tezClient.stop(); + } + + } + + private void runJoinValidate(TezClient tezClient, String name, int extExpectedCount, + Vertex.VertexExecutionContext lhsContext, + Vertex.VertexExecutionContext rhsContext, + Vertex.VertexExecutionContext validateContext) throws + Exception { + int externalSubmissionCount = tezTestServiceCluster.getNumSubmissions(); + + TezConfiguration tezConf = new TezConfiguration(confForJobs); + JoinValidateConfigured joinValidate = + new JoinValidateConfigured(null, lhsContext, rhsContext, + validateContext, name); + String[] validateArgs = new String[]{"-disableSplitGrouping", + HASH_JOIN_EXPECTED_RESULT_PATH.toString(), HASH_JOIN_OUTPUT_PATH.toString(), "3"}; + assertEquals(0, joinValidate.run(tezConf, validateArgs, tezClient)); + + // Ensure this was actually submitted to the external cluster + assertEquals(extExpectedCount, + (tezTestServiceCluster.getNumSubmissions() - externalSubmissionCount)); + } +}
