http://git-wip-us.apache.org/repos/asf/hive/blob/0b5c27fd/llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java ---------------------------------------------------------------------- diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java b/llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java index ec6e439..e43b72b 100644 --- a/llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java @@ -22,9 +22,11 @@ import com.google.protobuf.ByteString; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GroupInputSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.UserPayloadProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexIdentifier; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.tez.common.TezCommonUtils; import org.apache.tez.dag.api.EntityDescriptor; import org.apache.tez.dag.api.InputDescriptor; @@ -33,7 +35,10 @@ import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.event.VertexState; +import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.runtime.api.impl.GroupInputSpec; import org.apache.tez.runtime.api.impl.InputSpec; import org.apache.tez.runtime.api.impl.OutputSpec; @@ -41,55 +46,88 @@ import org.apache.tez.runtime.api.impl.TaskSpec; public class Converters { - public static TaskSpec getTaskSpecfromProto(FragmentSpecProto FragmentSpecProto) { - TezTaskAttemptID taskAttemptID = - TezTaskAttemptID.fromString(FragmentSpecProto.getFragmentIdentifierString()); + public static TaskSpec getTaskSpecfromProto(SignableVertexSpec vectorProto, + int fragmentNum, int attemptNum, TezTaskAttemptID attemptId) { + VertexIdentifier vertexId = vectorProto.getVertexIdentifier(); + TezTaskAttemptID taskAttemptID = attemptId != null ? attemptId + : createTaskAttemptId(vertexId, fragmentNum, attemptNum); ProcessorDescriptor processorDescriptor = null; - if (FragmentSpecProto.hasProcessorDescriptor()) { + if (vectorProto.hasProcessorDescriptor()) { processorDescriptor = convertProcessorDescriptorFromProto( - FragmentSpecProto.getProcessorDescriptor()); + vectorProto.getProcessorDescriptor()); } - List<InputSpec> inputSpecList = new ArrayList<InputSpec>(FragmentSpecProto.getInputSpecsCount()); - if (FragmentSpecProto.getInputSpecsCount() > 0) { - for (IOSpecProto inputSpecProto : FragmentSpecProto.getInputSpecsList()) { + List<InputSpec> inputSpecList = new ArrayList<InputSpec>(vectorProto.getInputSpecsCount()); + if (vectorProto.getInputSpecsCount() > 0) { + for (IOSpecProto inputSpecProto : vectorProto.getInputSpecsList()) { inputSpecList.add(getInputSpecFromProto(inputSpecProto)); } } List<OutputSpec> outputSpecList = - new ArrayList<OutputSpec>(FragmentSpecProto.getOutputSpecsCount()); - if (FragmentSpecProto.getOutputSpecsCount() > 0) { - for (IOSpecProto outputSpecProto : FragmentSpecProto.getOutputSpecsList()) { + new ArrayList<OutputSpec>(vectorProto.getOutputSpecsCount()); + if (vectorProto.getOutputSpecsCount() > 0) { + for (IOSpecProto outputSpecProto : vectorProto.getOutputSpecsList()) { outputSpecList.add(getOutputSpecFromProto(outputSpecProto)); } } List<GroupInputSpec> groupInputSpecs = - new ArrayList<GroupInputSpec>(FragmentSpecProto.getGroupedInputSpecsCount()); - if (FragmentSpecProto.getGroupedInputSpecsCount() > 0) { - for (GroupInputSpecProto groupInputSpecProto : FragmentSpecProto.getGroupedInputSpecsList()) { + new ArrayList<GroupInputSpec>(vectorProto.getGroupedInputSpecsCount()); + if (vectorProto.getGroupedInputSpecsCount() > 0) { + for (GroupInputSpecProto groupInputSpecProto : vectorProto.getGroupedInputSpecsList()) { groupInputSpecs.add(getGroupInputSpecFromProto(groupInputSpecProto)); } } TaskSpec taskSpec = - new TaskSpec(taskAttemptID, FragmentSpecProto.getDagName(), FragmentSpecProto.getVertexName(), - FragmentSpecProto.getVertexParallelism(), processorDescriptor, inputSpecList, + new TaskSpec(taskAttemptID, vectorProto.getDagName(), vectorProto.getVertexName(), + vectorProto.getVertexParallelism(), processorDescriptor, inputSpecList, outputSpecList, groupInputSpecs); return taskSpec; } - public static FragmentSpecProto convertTaskSpecToProto(TaskSpec taskSpec) { - FragmentSpecProto.Builder builder = FragmentSpecProto.newBuilder(); - builder.setFragmentIdentifierString(taskSpec.getTaskAttemptID().toString()); + public static TezTaskAttemptID createTaskAttemptId( + VertexIdentifier vertexId, int fragmentNum, int attemptNum) { + // Come ride the API roller-coaster! + return TezTaskAttemptID.getInstance( + TezTaskID.getInstance( + TezVertexID.getInstance( + TezDAGID.getInstance( + ConverterUtils.toApplicationId( + vertexId.getApplicationIdString()), + vertexId.getDagId()), + vertexId.getVertexId()), + fragmentNum), + attemptNum); + } + + public static VertexIdentifier createVertexIdentifier( + TezTaskAttemptID taId, int appAttemptId) { + VertexIdentifier.Builder idBuilder = VertexIdentifier.newBuilder(); + idBuilder.setApplicationIdString( + taId.getTaskID().getVertexID().getDAGId().getApplicationId().toString()); + idBuilder.setAppAttemptNumber(appAttemptId); + idBuilder.setDagId(taId.getTaskID().getVertexID().getDAGId().getId()); + idBuilder.setVertexId(taId.getTaskID().getVertexID().getId()); + return idBuilder.build(); + } + + public static SignableVertexSpec convertTaskSpecToProto(TaskSpec taskSpec, + int appAttemptId, String tokenIdentifier, Integer signatureKeyId, String user) { + TezTaskAttemptID tId = taskSpec.getTaskAttemptID(); + + SignableVertexSpec.Builder builder = SignableVertexSpec.newBuilder(); + builder.setVertexIdentifier(createVertexIdentifier(tId, appAttemptId)); builder.setDagName(taskSpec.getDAGName()); - builder.setDagId(taskSpec.getDagIdentifier()); builder.setVertexName(taskSpec.getVertexName()); builder.setVertexParallelism(taskSpec.getVertexParallelism()); - builder.setFragmentNumber(taskSpec.getTaskAttemptID().getTaskID().getId()); - builder.setAttemptNumber(taskSpec.getTaskAttemptID().getId()); + builder.setTokenIdentifier(tokenIdentifier); + builder.setUser(user); + if (signatureKeyId != null) { + builder.setSignatureKeyId(signatureKeyId); + } if (taskSpec.getProcessorDescriptor() != null) { builder.setProcessorDescriptor(
http://git-wip-us.apache.org/repos/asf/hive/blob/0b5c27fd/llap-common/src/protobuf/LlapDaemonProtocol.proto ---------------------------------------------------------------------- diff --git a/llap-common/src/protobuf/LlapDaemonProtocol.proto b/llap-common/src/protobuf/LlapDaemonProtocol.proto index 5cdc02e..486ba0a 100644 --- a/llap-common/src/protobuf/LlapDaemonProtocol.proto +++ b/llap-common/src/protobuf/LlapDaemonProtocol.proto @@ -46,19 +46,38 @@ message GroupInputSpecProto { optional EntityDescriptorProto merged_input_descriptor = 3; } +message VertexIdentifier { + optional string application_id_string = 1; + optional int32 app_attempt_number = 2; + optional int32 dag_id = 3; + optional int32 vertex_id = 4; +} + +// The part of SubmitWork that can be signed +message SignableVertexSpec +{ + optional string user = 1; + optional int64 signatureKeyId = 2; + + optional VertexIdentifier vertexIdentifier = 3; + // Display names cannot be modified by the client for now. If needed, they should be sent to HS2 who will put them here. + optional string dag_name = 4; + optional string vertex_name = 5; + + // The core vertex stuff + optional string token_identifier = 6; + optional EntityDescriptorProto processor_descriptor = 7; + repeated IOSpecProto input_specs = 8; + repeated IOSpecProto output_specs = 9; + repeated GroupInputSpecProto grouped_input_specs = 10; + + optional int32 vertex_parallelism = 11; // An internal field required for Tez. +} -message FragmentSpecProto { - optional string fragment_identifier_string = 1; - optional string dag_name = 2; - optional int32 dag_id = 11; - optional string vertex_name = 3; - optional EntityDescriptorProto processor_descriptor = 4; - repeated IOSpecProto input_specs = 5; - repeated IOSpecProto output_specs = 6; - repeated GroupInputSpecProto grouped_input_specs = 7; - optional int32 vertex_parallelism = 8; - optional int32 fragment_number =9; - optional int32 attempt_number = 10; +// Union +message VertexOrBinary { + optional SignableVertexSpec vertex = 1; + optional bytes vertexBinary = 2; // SignableVertexSpec } message FragmentRuntimeInfo { @@ -81,18 +100,24 @@ message QueryIdentifierProto { } message SubmitWorkRequestProto { - optional string container_id_string = 1; - optional string am_host = 2; - optional int32 am_port = 3; - optional string token_identifier = 4; - optional bytes credentials_binary = 5; - optional string user = 6; - optional string application_id_string = 7; - optional int32 app_attempt_number = 8; - optional FragmentSpecProto fragment_spec = 9; - optional FragmentRuntimeInfo fragment_runtime_info = 10; + optional VertexOrBinary work_spec = 1; + optional bytes work_spec_signature = 2; + + optional int32 fragment_number = 3; + optional int32 attempt_number = 4; + + optional string container_id_string = 5; + optional string am_host = 6; + optional int32 am_port = 7; + + // Credentials are not signed - the client can add e.g. his own HDFS tokens. + optional bytes credentials_binary = 8; + + // Not supported/honored for external clients right now. + optional FragmentRuntimeInfo fragment_runtime_info = 9; } + enum SubmissionStateProto { ACCEPTED = 1; REJECTED = 2; http://git-wip-us.apache.org/repos/asf/hive/blob/0b5c27fd/llap-common/src/test/org/apache/hadoop/hive/llap/tez/TestConverters.java ---------------------------------------------------------------------- diff --git a/llap-common/src/test/org/apache/hadoop/hive/llap/tez/TestConverters.java b/llap-common/src/test/org/apache/hadoop/hive/llap/tez/TestConverters.java index d4cdac1..349ee14 100644 --- a/llap-common/src/test/org/apache/hadoop/hive/llap/tez/TestConverters.java +++ b/llap-common/src/test/org/apache/hadoop/hive/llap/tez/TestConverters.java @@ -23,8 +23,8 @@ import java.util.List; import com.google.common.collect.Lists; import com.google.protobuf.ByteString; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.UserPayloadProto; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.dag.api.InputDescriptor; @@ -77,28 +77,24 @@ public class TestConverters { new TaskSpec(tezTaskAttemptId, "dagName", "vertexName", 10, processorDescriptor, inputSpecList, outputSpecList, null); + SignableVertexSpec vertexProto = Converters.convertTaskSpecToProto(taskSpec, 0, "", null, ""); - FragmentSpecProto fragmentSpecProto = Converters.convertTaskSpecToProto(taskSpec); - - - assertEquals("dagName", fragmentSpecProto.getDagName()); - assertEquals("vertexName", fragmentSpecProto.getVertexName()); - assertEquals(tezTaskAttemptId.toString(), fragmentSpecProto.getFragmentIdentifierString()); - assertEquals(tezDagId.getId(), fragmentSpecProto.getDagId()); - assertEquals(tezTaskAttemptId.getId(), fragmentSpecProto.getAttemptNumber()); - assertEquals(tezTaskId.getId(), fragmentSpecProto.getFragmentNumber()); + assertEquals("dagName", vertexProto.getDagName()); + assertEquals("vertexName", vertexProto.getVertexName()); + assertEquals(appId.toString(), vertexProto.getVertexIdentifier().getApplicationIdString()); + assertEquals(tezDagId.getId(), vertexProto.getVertexIdentifier().getDagId()); assertEquals(processorDescriptor.getClassName(), - fragmentSpecProto.getProcessorDescriptor().getClassName()); + vertexProto.getProcessorDescriptor().getClassName()); assertEquals(processorDescriptor.getUserPayload().getPayload(), - fragmentSpecProto.getProcessorDescriptor().getUserPayload().getUserPayload() + vertexProto.getProcessorDescriptor().getUserPayload().getUserPayload() .asReadOnlyByteBuffer()); - assertEquals(2, fragmentSpecProto.getInputSpecsCount()); - assertEquals(2, fragmentSpecProto.getOutputSpecsCount()); + assertEquals(2, vertexProto.getInputSpecsCount()); + assertEquals(2, vertexProto.getOutputSpecsCount()); - verifyInputSpecAndProto(inputSpec1, fragmentSpecProto.getInputSpecs(0)); - verifyInputSpecAndProto(inputSpec2, fragmentSpecProto.getInputSpecs(1)); - verifyOutputSpecAndProto(outputSpec1, fragmentSpecProto.getOutputSpecs(0)); - verifyOutputSpecAndProto(outputSpec2, fragmentSpecProto.getOutputSpecs(1)); + verifyInputSpecAndProto(inputSpec1, vertexProto.getInputSpecs(0)); + verifyInputSpecAndProto(inputSpec2, vertexProto.getInputSpecs(1)); + verifyOutputSpecAndProto(outputSpec1, vertexProto.getOutputSpecs(0)); + verifyOutputSpecAndProto(outputSpec2, vertexProto.getOutputSpecs(1)); } @@ -120,11 +116,10 @@ public class TestConverters { TezTaskID tezTaskId = TezTaskID.getInstance(tezVertexId, 500); TezTaskAttemptID tezTaskAttemptId = TezTaskAttemptID.getInstance(tezTaskId, 600); - FragmentSpecProto.Builder builder = FragmentSpecProto.newBuilder(); - builder.setFragmentIdentifierString(tezTaskAttemptId.toString()); + SignableVertexSpec.Builder builder = SignableVertexSpec.newBuilder(); + builder.setVertexIdentifier(Converters.createVertexIdentifier(tezTaskAttemptId, 0)); builder.setDagName("dagName"); builder.setVertexName("vertexName"); - builder.setDagId(tezDagId.getId()); builder.setProcessorDescriptor( EntityDescriptorProto.newBuilder().setClassName("fakeProcessorName").setUserPayload( UserPayloadProto.newBuilder().setUserPayload(ByteString.copyFrom(procBb)))); @@ -145,9 +140,9 @@ public class TestConverters { EntityDescriptorProto.newBuilder().setClassName("outputClassName").setUserPayload( UserPayloadProto.newBuilder().setUserPayload(ByteString.copyFrom(output1Bb))))); - FragmentSpecProto fragmentSpecProto = builder.build(); + SignableVertexSpec vertexProto = builder.build(); - TaskSpec taskSpec = Converters.getTaskSpecfromProto(fragmentSpecProto); + TaskSpec taskSpec = Converters.getTaskSpecfromProto(vertexProto, 0, 0, null); assertEquals("dagName", taskSpec.getDAGName()); assertEquals("vertexName", taskSpec.getVertexName()); @@ -160,12 +155,10 @@ public class TestConverters { assertEquals(2, taskSpec.getInputs().size()); assertEquals(2, taskSpec.getOutputs().size()); - verifyInputSpecAndProto(taskSpec.getInputs().get(0), fragmentSpecProto.getInputSpecs(0)); - verifyInputSpecAndProto(taskSpec.getInputs().get(1), fragmentSpecProto.getInputSpecs(1)); - verifyOutputSpecAndProto(taskSpec.getOutputs().get(0), fragmentSpecProto.getOutputSpecs(0)); - verifyOutputSpecAndProto(taskSpec.getOutputs().get(1), fragmentSpecProto.getOutputSpecs(1)); - - + verifyInputSpecAndProto(taskSpec.getInputs().get(0), vertexProto.getInputSpecs(0)); + verifyInputSpecAndProto(taskSpec.getInputs().get(1), vertexProto.getInputSpecs(1)); + verifyOutputSpecAndProto(taskSpec.getOutputs().get(0), vertexProto.getOutputSpecs(0)); + verifyOutputSpecAndProto(taskSpec.getOutputs().get(1), vertexProto.getOutputSpecs(1)); } private void verifyInputSpecAndProto(InputSpec inputSpec, http://git-wip-us.apache.org/repos/asf/hive/blob/0b5c27fd/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 78b37f7..2bfe3ed 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -33,11 +33,11 @@ import org.apache.hadoop.hive.llap.daemon.HistoryLogger; import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler; import org.apache.hadoop.hive.llap.daemon.QueryFailedHandler; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GroupInputSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteResponseProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmissionStateProto; @@ -45,7 +45,9 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWor import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexIdentifier; import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; +import org.apache.hadoop.hive.llap.tez.Converters; import org.apache.hadoop.hive.ql.exec.tez.TezProcessor; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.security.Credentials; @@ -151,32 +153,35 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu @Override public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws IOException { - HistoryLogger.logFragmentStart(request.getApplicationIdString(), request.getContainerIdString(), - localAddress.get().getHostName(), request.getFragmentSpec().getDagName(), request.getFragmentSpec().getDagId(), - request.getFragmentSpec().getVertexName(), request.getFragmentSpec().getFragmentNumber(), - request.getFragmentSpec().getAttemptNumber()); + // TODO: also support binary. Actually, we should figure out the binary stuff here and + // stop passing the protobuf around. We should pass around some plain objects/values. + SignableVertexSpec vertex = request.getWorkSpec().getVertex(); if (LOG.isInfoEnabled()) { - LOG.info("Queueing container for execution: " + stringifySubmitRequest(request)); + LOG.info("Queueing container for execution: " + stringifySubmitRequest(request, vertex)); } + VertexIdentifier vId = vertex.getVertexIdentifier(); + TezTaskAttemptID attemptId = Converters.createTaskAttemptId( + vId, request.getFragmentNumber(), request.getAttemptNumber()); + String fragmentIdString = attemptId.toString(); + HistoryLogger.logFragmentStart(vId.getApplicationIdString(), request.getContainerIdString(), + localAddress.get().getHostName(), vertex.getDagName(), vId.getDagId(), + vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber()); // This is the start of container-annotated logging. // TODO Reduce the length of this string. Way too verbose at the moment. - String ndcContextString = request.getFragmentSpec().getFragmentIdentifierString(); - NDC.push(ndcContextString); + NDC.push(fragmentIdString); Scheduler.SubmissionState submissionState; SubmitWorkResponseProto.Builder responseBuilder = SubmitWorkResponseProto.newBuilder(); try { Map<String, String> env = new HashMap<>(); // TODO What else is required in this environment map. env.putAll(localEnv); - env.put(ApplicationConstants.Environment.USER.name(), request.getUser()); + env.put(ApplicationConstants.Environment.USER.name(), vertex.getUser()); - FragmentSpecProto fragmentSpec = request.getFragmentSpec(); - TezTaskAttemptID taskAttemptId = TezTaskAttemptID.fromString( - fragmentSpec.getFragmentIdentifierString()); + TezTaskAttemptID taskAttemptId = TezTaskAttemptID.fromString(fragmentIdString); int dagIdentifier = taskAttemptId.getTaskID().getVertexID().getDAGId().getId(); QueryIdentifier queryIdentifier = new QueryIdentifier( - request.getApplicationIdString(), dagIdentifier); + vId.getApplicationIdString(), dagIdentifier); Credentials credentials = new Credentials(); DataInputBuffer dib = new DataInputBuffer(); @@ -186,14 +191,10 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials); - QueryFragmentInfo fragmentInfo = queryTracker - .registerFragment(queryIdentifier, request.getApplicationIdString(), - fragmentSpec.getDagName(), - dagIdentifier, - fragmentSpec.getVertexName(), fragmentSpec.getFragmentNumber(), - fragmentSpec.getAttemptNumber(), request.getUser(), request.getFragmentSpec(), - jobToken); - + QueryFragmentInfo fragmentInfo = queryTracker.registerFragment( + queryIdentifier, vId.getApplicationIdString(), vertex.getDagName(), dagIdentifier, + vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber(), + vertex.getUser(), vertex, jobToken, fragmentIdString); String[] localDirs = fragmentInfo.getLocalDirs(); Preconditions.checkNotNull(localDirs); @@ -202,14 +203,16 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu } // May need to setup localDir for re-localization, which is usually setup as Environment.PWD. // Used for re-localization, to add the user specified configuration (conf_pb_binary_stream) - TaskRunnerCallable callable = new TaskRunnerCallable(request, fragmentInfo, new Configuration(getConfig()), + + Configuration callableConf = new Configuration(getConfig()); + TaskRunnerCallable callable = new TaskRunnerCallable(request, fragmentInfo, callableConf, new LlapExecutionContext(localAddress.get().getHostName(), queryTracker), env, credentials, memoryPerExecutor, amReporter, confParams, metrics, killedTaskHandler, - this, tezHadoopShim); + this, tezHadoopShim, attemptId); submissionState = executorService.schedule(callable); if (LOG.isInfoEnabled()) { - LOG.info("SubmissionState for {} : {} ", ndcContextString, submissionState); + LOG.info("SubmissionState for {} : {} ", fragmentIdString, submissionState); } if (submissionState.equals(Scheduler.SubmissionState.REJECTED)) { @@ -300,24 +303,25 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu return sb.toString(); } - public static String stringifySubmitRequest(SubmitWorkRequestProto request) { + public static String stringifySubmitRequest( + SubmitWorkRequestProto request, SignableVertexSpec vertex) { StringBuilder sb = new StringBuilder(); - FragmentSpecProto fragmentSpec = request.getFragmentSpec(); sb.append("am_details=").append(request.getAmHost()).append(":").append(request.getAmPort()); - sb.append(", taskInfo=").append(fragmentSpec.getFragmentIdentifierString()); - sb.append(", user=").append(request.getUser()); - sb.append(", appIdString=").append(request.getApplicationIdString()); - sb.append(", appAttemptNum=").append(request.getAppAttemptNumber()); + sb.append(", taskInfo=").append(vertex.getVertexIdentifier()).append(" fragment ") + .append(request.getFragmentNumber()).append(" attempt ").append(request.getAttemptNumber()); + sb.append(", user=").append(vertex.getUser()); + sb.append(", appIdString=").append(vertex.getVertexIdentifier().getApplicationIdString()); + sb.append(", appAttemptNum=").append(vertex.getVertexIdentifier().getAppAttemptNumber()); sb.append(", containerIdString=").append(request.getContainerIdString()); - sb.append(", dagName=").append(fragmentSpec.getDagName()); - sb.append(", vertexName=").append(fragmentSpec.getVertexName()); - sb.append(", processor=").append(fragmentSpec.getProcessorDescriptor().getClassName()); - sb.append(", numInputs=").append(fragmentSpec.getInputSpecsCount()); - sb.append(", numOutputs=").append(fragmentSpec.getOutputSpecsCount()); - sb.append(", numGroupedInputs=").append(fragmentSpec.getGroupedInputSpecsCount()); + sb.append(", dagName=").append(vertex.getDagName()); + sb.append(", vertexName=").append(vertex.getVertexName()); + sb.append(", processor=").append(vertex.getProcessorDescriptor().getClassName()); + sb.append(", numInputs=").append(vertex.getInputSpecsCount()); + sb.append(", numOutputs=").append(vertex.getOutputSpecsCount()); + sb.append(", numGroupedInputs=").append(vertex.getGroupedInputSpecsCount()); sb.append(", Inputs={"); - if (fragmentSpec.getInputSpecsCount() > 0) { - for (IOSpecProto ioSpec : fragmentSpec.getInputSpecsList()) { + if (vertex.getInputSpecsCount() > 0) { + for (IOSpecProto ioSpec : vertex.getInputSpecsList()) { sb.append("{").append(ioSpec.getConnectedVertexName()).append(",") .append(ioSpec.getIoDescriptor().getClassName()).append(",") .append(ioSpec.getPhysicalEdgeCount()).append("}"); @@ -325,8 +329,8 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu } sb.append("}"); sb.append(", Outputs={"); - if (fragmentSpec.getOutputSpecsCount() > 0) { - for (IOSpecProto ioSpec : fragmentSpec.getOutputSpecsList()) { + if (vertex.getOutputSpecsCount() > 0) { + for (IOSpecProto ioSpec : vertex.getOutputSpecsList()) { sb.append("{").append(ioSpec.getConnectedVertexName()).append(",") .append(ioSpec.getIoDescriptor().getClassName()).append(",") .append(ioSpec.getPhysicalEdgeCount()).append("}"); @@ -334,8 +338,8 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu } sb.append("}"); sb.append(", GroupedInputs={"); - if (fragmentSpec.getGroupedInputSpecsCount() > 0) { - for (GroupInputSpecProto group : fragmentSpec.getGroupedInputSpecsList()) { + if (vertex.getGroupedInputSpecsCount() > 0) { + for (GroupInputSpecProto group : vertex.getGroupedInputSpecsList()) { sb.append("{").append("groupName=").append(group.getGroupName()).append(", elements=") .append(group.getGroupVerticesList()).append("}"); sb.append(group.getGroupVerticesList()); http://git-wip-us.apache.org/repos/asf/hive/blob/0b5c27fd/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java index 480a394..195775e 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java @@ -21,8 +21,8 @@ import java.util.List; import com.google.common.base.Preconditions; import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec; import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,19 +35,20 @@ public class QueryFragmentInfo { private final String vertexName; private final int fragmentNumber; private final int attemptNumber; - private final FragmentSpecProto fragmentSpec; + private final SignableVertexSpec vertexSpec; + private final String fragmentIdString; public QueryFragmentInfo(QueryInfo queryInfo, String vertexName, int fragmentNumber, - int attemptNumber, - FragmentSpecProto fragmentSpec) { + int attemptNumber, SignableVertexSpec vertexSpec, String fragmentIdString) { Preconditions.checkNotNull(queryInfo); Preconditions.checkNotNull(vertexName); - Preconditions.checkNotNull(fragmentSpec); + Preconditions.checkNotNull(vertexSpec); this.queryInfo = queryInfo; this.vertexName = vertexName; this.fragmentNumber = fragmentNumber; this.attemptNumber = attemptNumber; - this.fragmentSpec = fragmentSpec; + this.vertexSpec = vertexSpec; + this.fragmentIdString = fragmentIdString; } // Only meant for use by the QueryTracker @@ -55,8 +56,8 @@ public class QueryFragmentInfo { return this.queryInfo; } - public FragmentSpecProto getFragmentSpec() { - return fragmentSpec; + public SignableVertexSpec getVertexSpec() { + return vertexSpec; } public String getVertexName() { @@ -72,7 +73,7 @@ public class QueryFragmentInfo { } public String getFragmentIdentifierString() { - return fragmentSpec.getFragmentIdentifierString(); + return fragmentIdString; } /** @@ -85,7 +86,7 @@ public class QueryFragmentInfo { * @return true if the task can finish, false otherwise */ public boolean canFinish() { - List<IOSpecProto> inputSpecList = fragmentSpec.getInputSpecsList(); + List<IOSpecProto> inputSpecList = vertexSpec.getInputSpecsList(); boolean canFinish = true; if (inputSpecList != null && !inputSpecList.isEmpty()) { for (IOSpecProto inputSpec : inputSpecList) { @@ -126,7 +127,7 @@ public class QueryFragmentInfo { public boolean registerForFinishableStateUpdates(FinishableStateUpdateHandler handler, boolean lastFinishableState) { List<String> sourcesOfInterest = new LinkedList<>(); - List<IOSpecProto> inputSpecList = fragmentSpec.getInputSpecsList(); + List<IOSpecProto> inputSpecList = vertexSpec.getInputSpecsList(); if (inputSpecList != null && !inputSpecList.isEmpty()) { for (IOSpecProto inputSpec : inputSpecList) { if (LlapTezUtils.isSourceOfInterest(inputSpec.getIoDescriptor().getClassName())) { http://git-wip-us.apache.org/repos/asf/hive/blob/0b5c27fd/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java index 8daef9e..6914134 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java @@ -35,7 +35,7 @@ import com.google.common.collect.Multimap; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto; public class QueryInfo { @@ -92,9 +92,10 @@ public class QueryInfo { return sourceStateMap; } - public QueryFragmentInfo registerFragment(String vertexName, int fragmentNumber, int attemptNumber, FragmentSpecProto fragmentSpec) { - QueryFragmentInfo fragmentInfo = new QueryFragmentInfo(this, vertexName, fragmentNumber, attemptNumber, - fragmentSpec); + public QueryFragmentInfo registerFragment(String vertexName, int fragmentNumber, + int attemptNumber, SignableVertexSpec vertexSpec, String fragmentIdString) { + QueryFragmentInfo fragmentInfo = new QueryFragmentInfo( + this, vertexName, fragmentNumber, attemptNumber, vertexSpec, fragmentIdString); knownFragments.add(fragmentInfo); return fragmentInfo; } http://git-wip-us.apache.org/repos/asf/hive/blob/0b5c27fd/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java index cb3be2b..8abd198 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java @@ -29,7 +29,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto; import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler; import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory; @@ -113,20 +113,11 @@ public class QueryTracker extends AbstractService { /** * Register a new fragment for a specific query - * @param queryIdentifier - * @param appIdString - * @param dagName - * @param dagIdentifier - * @param vertexName - * @param fragmentNumber - * @param attemptNumber - * @param user - * @throws IOException */ QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String appIdString, String dagName, int dagIdentifier, String vertexName, int fragmentNumber, int attemptNumber, - String user, FragmentSpecProto fragmentSpec, Token<JobTokenIdentifier> appToken) - throws IOException { + String user, SignableVertexSpec vertex, Token<JobTokenIdentifier> appToken, + String fragmentIdString) throws IOException { ReadWriteLock dagLock = getDagLock(queryIdentifier); dagLock.readLock().lock(); try { @@ -166,7 +157,8 @@ public class QueryTracker extends AbstractService { .registerDag(appIdString, dagIdentifier, appToken, user, queryInfo.getLocalDirs()); - return queryInfo.registerFragment(vertexName, fragmentNumber, attemptNumber, fragmentSpec); + return queryInfo.registerFragment( + vertexName, fragmentNumber, attemptNumber, vertex, fragmentIdString); } finally { dagLock.readLock().unlock(); } http://git-wip-us.apache.org/repos/asf/hive/blob/0b5c27fd/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index 1933eb1..eac0e8f 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -43,7 +43,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec; import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; import org.apache.hadoop.service.AbstractService; import org.apache.tez.runtime.task.EndReason; @@ -191,8 +191,8 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta TaskWrapper task = e.getValue(); boolean isFirst = true; TaskRunnerCallable c = task.getTaskRunnerCallable(); - if (c != null && c.getRequest() != null && c.getRequest().getFragmentSpec() != null) { - FragmentSpecProto fs = c.getRequest().getFragmentSpec(); + if (c != null && c.getVertexSpec() != null) { + SignableVertexSpec fs = c.getVertexSpec(); value.append(isFirst ? " (" : ", ").append(fs.getDagName()) .append("/").append(fs.getVertexName()); isFirst = false; @@ -781,7 +781,7 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta ", firstAttemptStartTime=" + taskRunnerCallable.getFragmentRuntimeInfo().getFirstAttemptStartTime() + ", dagStartTime=" + taskRunnerCallable.getFragmentRuntimeInfo().getDagStartTime() + ", withinDagPriority=" + taskRunnerCallable.getFragmentRuntimeInfo().getWithinDagPriority() + - ", vertexParallelism= " + taskRunnerCallable.getFragmentSpec().getVertexParallelism() + + ", vertexParallelism= " + taskRunnerCallable.getVertexSpec().getVertexParallelism() + ", selfAndUpstreamParallelism= " + taskRunnerCallable.getFragmentRuntimeInfo().getNumSelfAndUpstreamTasks() + ", selfAndUpstreamComplete= " + taskRunnerCallable.getFragmentRuntimeInfo().getNumSelfAndUpstreamCompletedTasks() + '}'; http://git-wip-us.apache.org/repos/asf/hive/blob/0b5c27fd/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index fcfa940..3093de7 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -33,8 +33,8 @@ import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler; import org.apache.hadoop.hive.llap.daemon.HistoryLogger; import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol; @@ -113,6 +113,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { private final AtomicBoolean isStarted = new AtomicBoolean(false); private final AtomicBoolean isCompleted = new AtomicBoolean(false); private final AtomicBoolean killInvoked = new AtomicBoolean(false); + private final SignableVertexSpec vertex; @VisibleForTesting public TaskRunnerCallable(SubmitWorkRequestProto request, QueryFragmentInfo fragmentInfo, @@ -123,7 +124,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { ConfParams confParams, LlapDaemonExecutorMetrics metrics, KilledTaskHandler killedTaskHandler, FragmentCompletionHandler fragmentCompleteHandler, - HadoopShim tezHadoopShim) { + HadoopShim tezHadoopShim, TezTaskAttemptID attemptId) { this.request = request; this.fragmentInfo = fragmentInfo; this.conf = conf; @@ -134,17 +135,20 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { this.memoryAvailable = memoryAvailable; this.confParams = confParams; this.jobToken = TokenCache.getSessionToken(credentials); - this.taskSpec = Converters.getTaskSpecfromProto(request.getFragmentSpec()); + // TODO: support binary spec here or above + this.vertex = request.getWorkSpec().getVertex(); + this.taskSpec = Converters.getTaskSpecfromProto( + vertex, request.getFragmentNumber(), request.getAttemptNumber(), attemptId); this.amReporter = amReporter; // Register with the AMReporter when the callable is setup. Unregister once it starts running. if (jobToken != null) { this.amReporter.registerTask(request.getAmHost(), request.getAmPort(), - request.getUser(), jobToken, fragmentInfo.getQueryInfo().getQueryIdentifier()); + vertex.getUser(), jobToken, fragmentInfo.getQueryInfo().getQueryIdentifier()); } this.metrics = metrics; - this.requestId = request.getFragmentSpec().getFragmentIdentifierString(); + this.requestId = taskSpec.getTaskAttemptID().toString(); // TODO Change this to the queryId/Name when that's available. - this.queryId = request.getFragmentSpec().getDagName(); + this.queryId = vertex.getDagName(); this.killedTaskHandler = killedTaskHandler; this.fragmentCompletionHanler = fragmentCompleteHandler; this.tezHadoopShim = tezHadoopShim; @@ -184,16 +188,16 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { // TODO Consolidate this code with TezChild. runtimeWatch.start(); - UserGroupInformation taskUgi = UserGroupInformation.createRemoteUser(request.getUser()); + UserGroupInformation taskUgi = UserGroupInformation.createRemoteUser(vertex.getUser()); taskUgi.addCredentials(credentials); Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<>(); serviceConsumerMetadata.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID, TezCommonUtils.convertJobTokenToBytes(jobToken)); - Multimap<String, String> startedInputsMap = createStartedInputMap(request.getFragmentSpec()); + Multimap<String, String> startedInputsMap = createStartedInputMap(vertex); UserGroupInformation taskOwner = - UserGroupInformation.createRemoteUser(request.getTokenIdentifier()); + UserGroupInformation.createRemoteUser(vertex.getTokenIdentifier()); final InetSocketAddress address = NetUtils.createSocketAddrForHost(request.getAmHost(), request.getAmPort()); SecurityUtil.setTokenService(jobToken, address); @@ -228,7 +232,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { if (shouldRunTask) { taskRunner = new TezTaskRunner2(conf, taskUgi, fragmentInfo.getLocalDirs(), taskSpec, - request.getAppAttemptNumber(), + vertex.getVertexIdentifier().getAppAttemptNumber(), serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor, objectRegistry, pid, @@ -313,7 +317,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { */ public void reportTaskKilled() { killedTaskHandler - .taskKilled(request.getAmHost(), request.getAmPort(), request.getUser(), jobToken, + .taskKilled(request.getAmHost(), request.getAmPort(), vertex.getUser(), jobToken, fragmentInfo.getQueryInfo().getQueryIdentifier(), taskSpec.getTaskAttemptID()); } @@ -321,15 +325,15 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { return fragmentInfo.canFinish(); } - private Multimap<String, String> createStartedInputMap(FragmentSpecProto fragmentSpec) { + private static Multimap<String, String> createStartedInputMap(SignableVertexSpec vertex) { Multimap<String, String> startedInputMap = HashMultimap.create(); // Let the Processor control start for Broadcast inputs. // TODO For now, this affects non broadcast unsorted cases as well. Make use of the edge // property when it's available. - for (IOSpecProto inputSpec : fragmentSpec.getInputSpecsList()) { + for (IOSpecProto inputSpec : vertex.getInputSpecsList()) { if (inputSpec.getIoDescriptor().getClassName().equals(UnorderedKVInput.class.getName())) { - startedInputMap.put(fragmentSpec.getVertexName(), inputSpec.getConnectedVertexName()); + startedInputMap.put(vertex.getVertexName(), inputSpec.getConnectedVertexName()); } } return startedInputMap; @@ -350,7 +354,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { @Override public String toString() { return requestId + " {canFinish: " + canFinish() + - ", vertexParallelism: " + request.getFragmentSpec().getVertexParallelism() + + ", vertexParallelism: " + vertex.getVertexParallelism() + ", selfAndUpstreamParallelism: " + request.getFragmentRuntimeInfo().getNumSelfAndUpstreamTasks() + ", selfAndUpstreamComplete: " + request.getFragmentRuntimeInfo().getNumSelfAndUpstreamCompletedTasks() + ", firstAttemptStartTime: " + getFragmentRuntimeInfo().getFirstAttemptStartTime() + @@ -454,14 +458,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { fragmentCompletionHanler.fragmentComplete(fragmentInfo); taskRunnerCallable.shutdown(); - HistoryLogger - .logFragmentEnd(request.getApplicationIdString(), request.getContainerIdString(), - executionContext.getHostName(), request.getFragmentSpec().getDagName(), - fragmentInfo.getQueryInfo().getDagIdentifier(), - request.getFragmentSpec().getVertexName(), - request.getFragmentSpec().getFragmentNumber(), - request.getFragmentSpec().getAttemptNumber(), taskRunnerCallable.threadName, - taskRunnerCallable.startTime, true); + logFragmentEnd(true); } @Override @@ -471,14 +468,15 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { fragmentCompletionHanler.fragmentComplete(fragmentInfo); // TODO HIVE-10236 Report a fatal error over the umbilical taskRunnerCallable.shutdown(); - HistoryLogger - .logFragmentEnd(request.getApplicationIdString(), request.getContainerIdString(), - executionContext.getHostName(), request.getFragmentSpec().getDagName(), - fragmentInfo.getQueryInfo().getDagIdentifier(), - request.getFragmentSpec().getVertexName(), - request.getFragmentSpec().getFragmentNumber(), - request.getFragmentSpec().getAttemptNumber(), taskRunnerCallable.threadName, - taskRunnerCallable.startTime, false); + logFragmentEnd(false); + } + + protected void logFragmentEnd(boolean success) { + HistoryLogger.logFragmentEnd(vertex.getVertexIdentifier().getApplicationIdString(), + request.getContainerIdString(), executionContext.getHostName(), vertex.getDagName(), + fragmentInfo.getQueryInfo().getDagIdentifier(), vertex.getVertexName(), + request.getFragmentNumber(), request.getAttemptNumber(), taskRunnerCallable.threadName, + taskRunnerCallable.startTime, success); } } @@ -498,12 +496,14 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { public static String getTaskIdentifierString( SubmitWorkRequestProto request) { StringBuilder sb = new StringBuilder(); - sb.append("AppId=").append(request.getApplicationIdString()) + // TODO: also support the binary version + SignableVertexSpec vertex = request.getWorkSpec().getVertex(); + sb.append("AppId=").append(vertex.getVertexIdentifier().getApplicationIdString()) .append(", containerId=").append(request.getContainerIdString()) - .append(", Dag=").append(request.getFragmentSpec().getDagName()) - .append(", Vertex=").append(request.getFragmentSpec().getVertexName()) - .append(", FragmentNum=").append(request.getFragmentSpec().getFragmentNumber()) - .append(", Attempt=").append(request.getFragmentSpec().getAttemptNumber()); + .append(", Dag=").append(vertex.getDagName()) + .append(", Vertex=").append(vertex.getVertexName()) + .append(", FragmentNum=").append(request.getFragmentNumber()) + .append(", Attempt=").append(request.getAttemptNumber()); return sb.toString(); } @@ -511,7 +511,8 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { return request.getFragmentRuntimeInfo(); } - public FragmentSpecProto getFragmentSpec() { - return request.getFragmentSpec(); + public SignableVertexSpec getVertexSpec() { + // TODO: support for binary spec? presumably we'd parse it somewhere earlier + return vertex; } } http://git-wip-us.apache.org/repos/asf/hive/blob/0b5c27fd/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java index c6ba14e..d699f20 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java @@ -26,9 +26,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler; import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexOrBinary; import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; +import org.apache.hadoop.hive.llap.tez.Converters; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.dag.records.TezDAGID; @@ -51,26 +53,25 @@ public class TaskExecutorTestHelpers { SubmitWorkRequestProto requestProto = createSubmitWorkRequestProto(fragmentNum, parallelism, startTime); - QueryFragmentInfo queryFragmentInfo = createQueryFragmentInfo(requestProto.getFragmentSpec()); + QueryFragmentInfo queryFragmentInfo = createQueryFragmentInfo( + requestProto.getWorkSpec().getVertex(), requestProto.getFragmentNumber()); MockRequest mockRequest = new MockRequest(requestProto, queryFragmentInfo, canFinish, workTime); return mockRequest; } public static TaskExecutorService.TaskWrapper createTaskWrapper( SubmitWorkRequestProto request, boolean canFinish, int workTime) { - QueryFragmentInfo queryFragmentInfo = createQueryFragmentInfo(request.getFragmentSpec()); + QueryFragmentInfo queryFragmentInfo = createQueryFragmentInfo( + request.getWorkSpec().getVertex(), request.getFragmentNumber()); MockRequest mockRequest = new MockRequest(request, queryFragmentInfo, canFinish, workTime); TaskExecutorService.TaskWrapper taskWrapper = new TaskExecutorService.TaskWrapper(mockRequest, null); return taskWrapper; } - public static QueryFragmentInfo createQueryFragmentInfo(FragmentSpecProto fragmentSpecProto) { - QueryInfo queryInfo = createQueryInfo(); - QueryFragmentInfo fragmentInfo = - new QueryFragmentInfo(queryInfo, "fakeVertexName", fragmentSpecProto.getFragmentNumber(), 0, - fragmentSpecProto); - return fragmentInfo; + public static QueryFragmentInfo createQueryFragmentInfo( + SignableVertexSpec vertex, int fragmentNum) { + return new QueryFragmentInfo(createQueryInfo(), "fakeVertexName", fragmentNum, 0, vertex, ""); } public static QueryInfo createQueryInfo() { @@ -100,20 +101,23 @@ public class TaskExecutorTestHelpers { TezTaskAttemptID taId = TezTaskAttemptID.getInstance(tId, fragmentNumber); return SubmitWorkRequestProto .newBuilder() - .setFragmentSpec( - FragmentSpecProto - .newBuilder() - .setAttemptNumber(0) + .setAttemptNumber(0) + .setFragmentNumber(fragmentNumber) + .setWorkSpec( + VertexOrBinary.newBuilder().setVertex( + SignableVertexSpec.newBuilder() .setDagName("MockDag") - .setFragmentNumber(fragmentNumber) + .setUser("MockUser") + .setTokenIdentifier("MockToken_1") + .setVertexIdentifier(Converters.createVertexIdentifier(taId, 0)) .setVertexName("MockVertex") .setProcessorDescriptor( LlapDaemonProtocolProtos.EntityDescriptorProto.newBuilder() .setClassName("MockProcessor").build()) - .setFragmentIdentifierString(taId.toString()).build()).setAmHost("localhost") - .setAmPort(12345).setAppAttemptNumber(0).setApplicationIdString("MockApp_1") - .setContainerIdString("MockContainer_1").setUser("MockUser") - .setTokenIdentifier("MockToken_1") + .build()).build()) + .setAmHost("localhost") + .setAmPort(12345) + .setContainerIdString("MockContainer_1") .setFragmentRuntimeInfo(LlapDaemonProtocolProtos .FragmentRuntimeInfo .newBuilder() @@ -146,7 +150,7 @@ public class TaskExecutorTestHelpers { new ExecutionContextImpl("localhost"), null, new Credentials(), 0, null, null, mock( LlapDaemonExecutorMetrics.class), mock(KilledTaskHandler.class), mock( - FragmentCompletionHandler.class), new DefaultHadoopShim()); + FragmentCompletionHandler.class), new DefaultHadoopShim(), null); this.workTime = workTime; this.canFinish = canFinish; } http://git-wip-us.apache.org/repos/asf/hive/blob/0b5c27fd/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java index 08ee769..a250882 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java @@ -31,8 +31,11 @@ import org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorService.TaskWrapper; import org.apache.hadoop.hive.llap.daemon.impl.TaskRunnerCallable; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexIdentifier; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexOrBinary; +import org.apache.hadoop.hive.llap.tez.Converters; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.dag.records.TezDAGID; @@ -59,7 +62,7 @@ public class TestFirstInFirstOutComparator { super(requestProto, mock(QueryFragmentInfo.class), conf, new ExecutionContextImpl("localhost"), null, cred, 0, null, null, null, mock(KilledTaskHandler.class), mock( - FragmentCompletionHandler.class), new DefaultHadoopShim()); + FragmentCompletionHandler.class), new DefaultHadoopShim(), null); this.workTime = workTime; this.canFinish = canFinish; } @@ -102,19 +105,23 @@ public class TestFirstInFirstOutComparator { TezTaskAttemptID taId = TezTaskAttemptID.getInstance(tId, fragmentNumber); return SubmitWorkRequestProto .newBuilder() - .setFragmentSpec( - FragmentSpecProto + .setAttemptNumber(0) + .setFragmentNumber(fragmentNumber) + .setWorkSpec( + VertexOrBinary.newBuilder().setVertex( + SignableVertexSpec .newBuilder() - .setAttemptNumber(0) + .setVertexIdentifier(Converters.createVertexIdentifier(taId, 0)) .setDagName("MockDag") - .setFragmentNumber(fragmentNumber) .setVertexName("MockVertex") + .setUser("MockUser") + .setTokenIdentifier("MockToken_1") .setProcessorDescriptor( EntityDescriptorProto.newBuilder().setClassName("MockProcessor").build()) - .setFragmentIdentifierString(taId.toString()).build()).setAmHost("localhost") - .setAmPort(12345).setAppAttemptNumber(0).setApplicationIdString("MockApp_1") - .setContainerIdString("MockContainer_1").setUser("MockUser") - .setTokenIdentifier("MockToken_1") + .build()).build()) + .setAmHost("localhost") + .setAmPort(12345) + .setContainerIdString("MockContainer_1") .setFragmentRuntimeInfo(LlapDaemonProtocolProtos .FragmentRuntimeInfo .newBuilder() http://git-wip-us.apache.org/repos/asf/hive/blob/0b5c27fd/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java ---------------------------------------------------------------------- diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index b4b041a..a3f2eb8 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWor import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexOrBinary; import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol; import org.apache.hadoop.hive.llap.registry.ServiceInstance; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; @@ -89,10 +90,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { private static final Logger LOG = LoggerFactory.getLogger(LlapTaskCommunicator.class); private static final boolean isInfoEnabled = LOG.isInfoEnabled(); - private static final boolean isDebugEnabed = LOG.isDebugEnabled(); - - private final SubmitWorkRequestProto BASE_SUBMIT_WORK_REQUEST; - + private final ConcurrentMap<QueryIdentifierProto, ByteBuffer> credentialMap; // Tracks containerIds and taskAttemptIds, so can be kept independent of the running DAG. @@ -105,6 +103,8 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { private long deleteDelayOnDagComplete; private final LlapTaskUmbilicalProtocol umbilical; private final Token<LlapTokenIdentifier> token; + private final int appAttemptId; + private final String user; // These two structures track the list of known nodes, and the list of nodes which are sending in keep-alive heartbeats. // Primarily for debugging purposes a.t.m, since there's some unexplained TASK_TIMEOUTS which are currently being observed. @@ -113,8 +113,6 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { private final LlapRegistryService serviceRegistry; - - private volatile int currentDagId; private volatile QueryIdentifierProto currentQueryIdentifierProto; public LlapTaskCommunicator( @@ -138,17 +136,10 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { serviceRegistry = LlapRegistryService.getClient(conf); umbilical = new LlapTaskUmbilicalProtocolImpl(getUmbilical()); - SubmitWorkRequestProto.Builder baseBuilder = SubmitWorkRequestProto.newBuilder(); // TODO Avoid reading this from the environment - baseBuilder.setUser(System.getenv(ApplicationConstants.Environment.USER.name())); - baseBuilder.setApplicationIdString( - taskCommunicatorContext.getApplicationAttemptId().getApplicationId().toString()); - baseBuilder - .setAppAttemptNumber(taskCommunicatorContext.getApplicationAttemptId().getAttemptId()); - baseBuilder.setTokenIdentifier(getTokenIdentifier()); - - BASE_SUBMIT_WORK_REQUEST = baseBuilder.build(); + user = System.getenv(ApplicationConstants.Environment.USER.name()); + appAttemptId = taskCommunicatorContext.getApplicationAttemptId().getAttemptId(); credentialMap = new ConcurrentHashMap<>(); sourceStateTracker = new SourceStateTracker(getContext(), this); @@ -316,7 +307,6 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { t = se.getCause(); } if (t instanceof RemoteException) { - RemoteException re = (RemoteException) t; // All others from the remote service cause the task to FAIL. LOG.info( "Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " + @@ -591,8 +581,9 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { TaskSpec taskSpec, FragmentRuntimeInfo fragmentRuntimeInfo) throws IOException { - SubmitWorkRequestProto.Builder builder = - SubmitWorkRequestProto.newBuilder(BASE_SUBMIT_WORK_REQUEST); + SubmitWorkRequestProto.Builder builder = SubmitWorkRequestProto.newBuilder(); + builder.setFragmentNumber(taskSpec.getTaskAttemptID().getTaskID().getId()); + builder.setAttemptNumber(taskSpec.getTaskAttemptID().getId()); builder.setContainerIdString(containerId.toString()); builder.setAmHost(getAddress().getHostName()); builder.setAmPort(getAddress().getPort()); @@ -607,7 +598,9 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { credentialsBinary = credentialsBinary.duplicate(); } builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary)); - builder.setFragmentSpec(Converters.convertTaskSpecToProto(taskSpec)); + builder.setWorkSpec(VertexOrBinary.newBuilder().setVertex(Converters.convertTaskSpecToProto( + taskSpec, appAttemptId, getTokenIdentifier(), null, user)).build()); + // Don't call builder.setWorkSpecSignature() - Tez doesn't sign fragments builder.setFragmentRuntimeInfo(fragmentRuntimeInfo); return builder.build(); }
