Repository: tez Updated Branches: refs/heads/TEZ-2003 bd6fcf95d -> 5fdbe04ff (forced update)
http://git-wip-us.apache.org/repos/asf/tez/blob/36e7f854/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java new file mode 100644 index 0000000..a93c1a4 --- /dev/null +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java @@ -0,0 +1,183 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.tests; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.tez.client.TezClient; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.app.launcher.TezTestServiceNoOpContainerLauncher; +import org.apache.tez.dag.app.rm.TezTestServiceTaskSchedulerService; +import org.apache.tez.dag.app.taskcomm.TezTestServiceTaskCommunicatorImpl; +import org.apache.tez.examples.HashJoinExample; +import org.apache.tez.examples.JoinDataGen; +import org.apache.tez.examples.JoinValidate; +import org.apache.tez.service.MiniTezTestServiceCluster; +import org.apache.tez.test.MiniTezCluster; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestExternalTezServices { + + private static final Log LOG = LogFactory.getLog(TestExternalTezServices.class); + + private static MiniTezCluster tezCluster; + private static MiniDFSCluster dfsCluster; + private static MiniTezTestServiceCluster tezTestServiceCluster; + + private static Configuration clusterConf = new Configuration(); + private static Configuration confForJobs; + + private static FileSystem remoteFs; + private static FileSystem localFs; + + private static TezClient sharedTezClient; + + private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR + TestExternalTezServices.class.getName() + + "-tmpDir"; + + @BeforeClass + public static void setup() throws IOException, TezException, InterruptedException { + + localFs = FileSystem.getLocal(clusterConf); + + try { + clusterConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR); + dfsCluster = + new MiniDFSCluster.Builder(clusterConf).numDataNodes(1).format(true).racks(null).build(); + remoteFs = dfsCluster.getFileSystem(); + LOG.info("MiniDFSCluster started"); + } catch (IOException io) { + throw new RuntimeException("problem starting mini dfs cluster", io); + } + + tezCluster = new MiniTezCluster(TestExternalTezServices.class.getName(), 1, 1, 1); + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS + tezCluster.init(conf); + tezCluster.start(); + LOG.info("MiniTezCluster started"); + + clusterConf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS + for (Map.Entry<String, String> entry : tezCluster.getConfig()) { + clusterConf.set(entry.getKey(), entry.getValue()); + } + long jvmMax = Runtime.getRuntime().maxMemory(); + + tezTestServiceCluster = MiniTezTestServiceCluster + .create(TestExternalTezServices.class.getSimpleName(), 3, ((long) (jvmMax * 0.5d)), 1); + tezTestServiceCluster.init(clusterConf); + tezTestServiceCluster.start(); + LOG.info("MiniTezTestServer started"); + + confForJobs = new Configuration(clusterConf); + for (Map.Entry<String, String> entry : tezTestServiceCluster + .getClusterSpecificConfiguration()) { + confForJobs.set(entry.getKey(), entry.getValue()); + } + + // TODO TEZ-2003 Once per vertex configuration is possible, run separate tests for push vs pull (regular threaded execution) + + Path stagingDirPath = new Path("/tmp/tez-staging-dir"); + remoteFs.mkdirs(stagingDirPath); + // This is currently configured to push tasks into the Service, and then use the standard RPC + confForJobs.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString()); + confForJobs.set(TezConfiguration.TEZ_AM_TASK_SCHEDULER_CLASS, + TezTestServiceTaskSchedulerService.class.getName()); + confForJobs.set(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHER_CLASS, + TezTestServiceNoOpContainerLauncher.class.getName()); + confForJobs.set(TezConfiguration.TEZ_AM_TASK_COMMUNICATOR_CLASS, + TezTestServiceTaskCommunicatorImpl.class.getName()); + + TezConfiguration tezConf = new TezConfiguration(confForJobs); + + sharedTezClient = TezClient.create(TestExternalTezServices.class.getSimpleName() + "_session", + tezConf, true); + sharedTezClient.start(); + LOG.info("Shared TezSession started"); + sharedTezClient.waitTillReady(); + LOG.info("Shared TezSession ready for submission"); + + } + + @AfterClass + public static void tearDown() throws IOException, TezException { + if (sharedTezClient != null) { + sharedTezClient.stop(); + sharedTezClient = null; + } + + if (tezTestServiceCluster != null) { + tezTestServiceCluster.stop(); + tezTestServiceCluster = null; + } + + if (tezCluster != null) { + tezCluster.stop(); + tezCluster = null; + } + if (dfsCluster != null) { + dfsCluster.shutdown(); + dfsCluster = null; + } + // TODO Add cleanup code. + } + + + @Test(timeout = 60000) + public void test1() throws Exception { + Path testDir = new Path("/tmp/testHashJoinExample"); + + remoteFs.mkdirs(testDir); + + Path dataPath1 = new Path(testDir, "inPath1"); + Path dataPath2 = new Path(testDir, "inPath2"); + Path expectedOutputPath = new Path(testDir, "expectedOutputPath"); + Path outPath = new Path(testDir, "outPath"); + + TezConfiguration tezConf = new TezConfiguration(confForJobs); + + JoinDataGen dataGen = new JoinDataGen(); + String[] dataGenArgs = new String[]{ + dataPath1.toString(), "1048576", dataPath2.toString(), "524288", + expectedOutputPath.toString(), "2"}; + assertEquals(0, dataGen.run(tezConf, dataGenArgs, sharedTezClient)); + + HashJoinExample joinExample = new HashJoinExample(); + String[] args = new String[]{ + dataPath1.toString(), dataPath2.toString(), "2", outPath.toString()}; + assertEquals(0, joinExample.run(tezConf, args, sharedTezClient)); + + JoinValidate joinValidate = new JoinValidate(); + String[] validateArgs = new String[]{ + expectedOutputPath.toString(), outPath.toString(), "3"}; + assertEquals(0, joinValidate.run(tezConf, validateArgs, sharedTezClient)); + + // Ensure this was actually submitted to the external cluster + assertTrue(tezTestServiceCluster.getNumSubmissions() > 0); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/36e7f854/tez-ext-service-tests/src/test/java/org/apache/tez/util/ProtoConverters.java ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/util/ProtoConverters.java b/tez-ext-service-tests/src/test/java/org/apache/tez/util/ProtoConverters.java new file mode 100644 index 0000000..60ebc53 --- /dev/null +++ b/tez-ext-service-tests/src/test/java/org/apache/tez/util/ProtoConverters.java @@ -0,0 +1,172 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.util; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.tez.dag.api.DagTypeConverters; +import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.OutputDescriptor; +import org.apache.tez.dag.api.ProcessorDescriptor; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.runtime.api.impl.GroupInputSpec; +import org.apache.tez.runtime.api.impl.InputSpec; +import org.apache.tez.runtime.api.impl.OutputSpec; +import org.apache.tez.runtime.api.impl.TaskSpec; +import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.GroupInputSpecProto; +import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.IOSpecProto; +import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.TaskSpecProto; + +public class ProtoConverters { + + public static TaskSpec getTaskSpecfromProto(TaskSpecProto taskSpecProto) { + TezTaskAttemptID taskAttemptID = + TezTaskAttemptID.fromString(taskSpecProto.getTaskAttemptIdString()); + + ProcessorDescriptor processorDescriptor = null; + if (taskSpecProto.hasProcessorDescriptor()) { + processorDescriptor = DagTypeConverters + .convertProcessorDescriptorFromDAGPlan(taskSpecProto.getProcessorDescriptor()); + } + + List<InputSpec> inputSpecList = new ArrayList<InputSpec>(taskSpecProto.getInputSpecsCount()); + if (taskSpecProto.getInputSpecsCount() > 0) { + for (IOSpecProto inputSpecProto : taskSpecProto.getInputSpecsList()) { + inputSpecList.add(getInputSpecFromProto(inputSpecProto)); + } + } + + List<OutputSpec> outputSpecList = + new ArrayList<OutputSpec>(taskSpecProto.getOutputSpecsCount()); + if (taskSpecProto.getOutputSpecsCount() > 0) { + for (IOSpecProto outputSpecProto : taskSpecProto.getOutputSpecsList()) { + outputSpecList.add(getOutputSpecFromProto(outputSpecProto)); + } + } + + List<GroupInputSpec> groupInputSpecs = + new ArrayList<GroupInputSpec>(taskSpecProto.getGroupedInputSpecsCount()); + if (taskSpecProto.getGroupedInputSpecsCount() > 0) { + for (GroupInputSpecProto groupInputSpecProto : taskSpecProto.getGroupedInputSpecsList()) { + groupInputSpecs.add(getGroupInputSpecFromProto(groupInputSpecProto)); + } + } + + TaskSpec taskSpec = + new TaskSpec(taskAttemptID, taskSpecProto.getDagName(), taskSpecProto.getVertexName(), + taskSpecProto.getVertexParallelism(), processorDescriptor, inputSpecList, + outputSpecList, groupInputSpecs); + return taskSpec; + } + + public static TaskSpecProto convertTaskSpecToProto(TaskSpec taskSpec) { + TaskSpecProto.Builder builder = TaskSpecProto.newBuilder(); + builder.setTaskAttemptIdString(taskSpec.getTaskAttemptID().toString()); + builder.setDagName(taskSpec.getDAGName()); + builder.setVertexName(taskSpec.getVertexName()); + builder.setVertexParallelism(taskSpec.getVertexParallelism()); + + if (taskSpec.getProcessorDescriptor() != null) { + builder.setProcessorDescriptor( + DagTypeConverters.convertToDAGPlan(taskSpec.getProcessorDescriptor())); + } + + if (taskSpec.getInputs() != null && !taskSpec.getInputs().isEmpty()) { + for (InputSpec inputSpec : taskSpec.getInputs()) { + builder.addInputSpecs(convertInputSpecToProto(inputSpec)); + } + } + + if (taskSpec.getOutputs() != null && !taskSpec.getOutputs().isEmpty()) { + for (OutputSpec outputSpec : taskSpec.getOutputs()) { + builder.addOutputSpecs(convertOutputSpecToProto(outputSpec)); + } + } + + if (taskSpec.getGroupInputs() != null && !taskSpec.getGroupInputs().isEmpty()) { + for (GroupInputSpec groupInputSpec : taskSpec.getGroupInputs()) { + builder.addGroupedInputSpecs(convertGroupInputSpecToProto(groupInputSpec)); + + } + } + return builder.build(); + } + + + public static InputSpec getInputSpecFromProto(IOSpecProto inputSpecProto) { + InputDescriptor inputDescriptor = null; + if (inputSpecProto.hasIoDescriptor()) { + inputDescriptor = + DagTypeConverters.convertInputDescriptorFromDAGPlan(inputSpecProto.getIoDescriptor()); + } + InputSpec inputSpec = new InputSpec(inputSpecProto.getConnectedVertexName(), inputDescriptor, + inputSpecProto.getPhysicalEdgeCount()); + return inputSpec; + } + + public static IOSpecProto convertInputSpecToProto(InputSpec inputSpec) { + IOSpecProto.Builder builder = IOSpecProto.newBuilder(); + if (inputSpec.getSourceVertexName() != null) { + builder.setConnectedVertexName(inputSpec.getSourceVertexName()); + } + if (inputSpec.getInputDescriptor() != null) { + builder.setIoDescriptor(DagTypeConverters.convertToDAGPlan(inputSpec.getInputDescriptor())); + } + builder.setPhysicalEdgeCount(inputSpec.getPhysicalEdgeCount()); + return builder.build(); + } + + public static OutputSpec getOutputSpecFromProto(IOSpecProto outputSpecProto) { + OutputDescriptor outputDescriptor = null; + if (outputSpecProto.hasIoDescriptor()) { + outputDescriptor = + DagTypeConverters.convertOutputDescriptorFromDAGPlan(outputSpecProto.getIoDescriptor()); + } + OutputSpec outputSpec = + new OutputSpec(outputSpecProto.getConnectedVertexName(), outputDescriptor, + outputSpecProto.getPhysicalEdgeCount()); + return outputSpec; + } + + public static IOSpecProto convertOutputSpecToProto(OutputSpec outputSpec) { + IOSpecProto.Builder builder = IOSpecProto.newBuilder(); + if (outputSpec.getDestinationVertexName() != null) { + builder.setConnectedVertexName(outputSpec.getDestinationVertexName()); + } + if (outputSpec.getOutputDescriptor() != null) { + builder.setIoDescriptor(DagTypeConverters.convertToDAGPlan(outputSpec.getOutputDescriptor())); + } + builder.setPhysicalEdgeCount(outputSpec.getPhysicalEdgeCount()); + return builder.build(); + } + + public static GroupInputSpec getGroupInputSpecFromProto(GroupInputSpecProto groupInputSpecProto) { + GroupInputSpec groupSpec = new GroupInputSpec(groupInputSpecProto.getGroupName(), + groupInputSpecProto.getGroupVerticesList(), DagTypeConverters + .convertInputDescriptorFromDAGPlan(groupInputSpecProto.getMergedInputDescriptor())); + return groupSpec; + } + + public static GroupInputSpecProto convertGroupInputSpecToProto(GroupInputSpec groupInputSpec) { + GroupInputSpecProto.Builder builder = GroupInputSpecProto.newBuilder(); + builder.setGroupName(groupInputSpec.getGroupName()); + builder.addAllGroupVertices(groupInputSpec.getGroupVertices()); + builder.setMergedInputDescriptor( + DagTypeConverters.convertToDAGPlan(groupInputSpec.getMergedInputDescriptor())); + return builder.build(); + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/36e7f854/tez-ext-service-tests/src/test/proto/TezDaemonProtocol.proto ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/proto/TezDaemonProtocol.proto b/tez-ext-service-tests/src/test/proto/TezDaemonProtocol.proto new file mode 100644 index 0000000..2f8b2e6 --- /dev/null +++ b/tez-ext-service-tests/src/test/proto/TezDaemonProtocol.proto @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "org.apache.tez.test.service.rpc"; +option java_outer_classname = "TezTestServiceProtocolProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; + +import "DAGApiRecords.proto"; + +message IOSpecProto { + optional string connected_vertex_name = 1; + optional TezEntityDescriptorProto io_descriptor = 2; + optional int32 physical_edge_count = 3; +} + +message GroupInputSpecProto { + optional string group_name = 1; + repeated string group_vertices = 2; + optional TezEntityDescriptorProto merged_input_descriptor = 3; +} + +message TaskSpecProto { + optional string task_attempt_id_string = 1; + optional string dag_name = 2; + optional string vertex_name = 3; + optional TezEntityDescriptorProto processor_descriptor = 4; + repeated IOSpecProto input_specs = 5; + repeated IOSpecProto output_specs = 6; + repeated GroupInputSpecProto grouped_input_specs = 7; + optional int32 vertex_parallelism = 8; +} + + +message SubmitWorkRequestProto { + optional string container_id_string = 1; + optional string am_host = 2; + optional int32 am_port = 3; + optional string token_identifier = 4; + optional bytes credentials_binary = 5; + optional string user = 6; + optional string application_id_string = 7; + optional int32 app_attempt_number = 8; + optional TaskSpecProto task_spec = 9; +} + +message SubmitWorkResponseProto { +} + + + +message RunContainerRequestProto { + optional string container_id_string = 1; + optional string am_host = 2; + optional int32 am_port = 3; + optional string token_identifier = 4; + optional bytes credentials_binary = 5; + optional string user = 6; + optional string application_id_string = 7; + optional int32 app_attempt_number = 8; +} + +message RunContainerResponseProto { +} + +service TezTestServiceProtocol { + rpc runContainer(RunContainerRequestProto) returns (RunContainerResponseProto); + rpc submitWork(SubmitWorkRequestProto) returns (SubmitWorkResponseProto); +} http://git-wip-us.apache.org/repos/asf/tez/blob/36e7f854/tez-ext-service-tests/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/tez-ext-service-tests/src/test/resources/log4j.properties b/tez-ext-service-tests/src/test/resources/log4j.properties new file mode 100644 index 0000000..531b68b --- /dev/null +++ b/tez-ext-service-tests/src/test/resources/log4j.properties @@ -0,0 +1,19 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# log4j configuration used during build and unit tests + +log4j.rootLogger=info,stdout +log4j.threshhold=ALL +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} (%F:%M(%L)) - %m%n http://git-wip-us.apache.org/repos/asf/tez/blob/36e7f854/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java index 94efbee..e6ef5e2 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java @@ -404,7 +404,7 @@ public class TezChild { private final Throwable throwable; private final String errorMessage; - ContainerExecutionResult(ExitStatus exitStatus, @Nullable Throwable throwable, + public ContainerExecutionResult(ExitStatus exitStatus, @Nullable Throwable throwable, @Nullable String errorMessage) { this.exitStatus = exitStatus; this.throwable = throwable; http://git-wip-us.apache.org/repos/asf/tez/blob/36e7f854/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java index de83889..f54814b 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java @@ -67,7 +67,7 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter { private final AtomicBoolean taskRunning; private final AtomicBoolean shutdownRequested = new AtomicBoolean(false); - TezTaskRunner(Configuration tezConf, UserGroupInformation ugi, String[] localDirs, + public TezTaskRunner(Configuration tezConf, UserGroupInformation ugi, String[] localDirs, TaskSpec taskSpec, int appAttemptNumber, Map<String, ByteBuffer> serviceConsumerMetadata, Map<String, String> serviceProviderEnvMap, Multimap<String, String> startedInputsMap, TaskReporter taskReporter,
