Repository: hive Updated Branches: refs/heads/llap 33c86c45c -> 342668f91
reverting HIVE-13674: usingTezAm field not required in LLAP SubmitWorkRequestProto Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/342668f9 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/342668f9 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/342668f9 Branch: refs/heads/llap Commit: 342668f914587aa396a7f74c421c4dcf2037d433 Parents: 33c86c4 Author: Jason Dere <[email protected]> Authored: Mon May 2 18:23:30 2016 -0700 Committer: Jason Dere <[email protected]> Committed: Mon May 2 18:23:30 2016 -0700 ---------------------------------------------------------------------- .../ext/LlapTaskUmbilicalExternalClient.java | 4 +- .../daemon/rpc/LlapDaemonProtocolProtos.java | 230 +++++++++++++------ .../src/protobuf/LlapDaemonProtocol.proto | 8 + .../hadoop/hive/llap/LlapBaseInputFormat.java | 1 + .../llap/daemon/impl/TaskRunnerCallable.java | 3 + 5 files changed, 175 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/342668f9/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java index fe2fd7c..8598bc8 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java @@ -123,10 +123,12 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService { /** - * Submit the work for actual execution. + * Submit the work for actual execution. This should always have the usingTezAm flag disabled * @param submitWorkRequestProto */ public void submitWork(final SubmitWorkRequestProto submitWorkRequestProto, String llapHost, int llapPort, List<TezEvent> tezEvents) { + Preconditions.checkArgument(submitWorkRequestProto.getUsingTezAm() == false); + // Register the pending events to be sent for this spec. String fragmentId = submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString(); PendingEventData pendingEventData = new PendingEventData( http://git-wip-us.apache.org/repos/asf/hive/blob/342668f9/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java ---------------------------------------------------------------------- diff --git a/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java b/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java index 6a20031..653e7e0 100644 --- a/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java +++ b/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java @@ -1,5 +1,5 @@ // Generated by the protocol buffer compiler. DO NOT EDIT! -// source: llap-common/src/protobuf/LlapDaemonProtocol.proto +// source: LlapDaemonProtocol.proto package org.apache.hadoop.hive.llap.daemon.rpc; @@ -7334,6 +7334,16 @@ public final class LlapDaemonProtocolProtos { * <code>optional .FragmentRuntimeInfo fragment_runtime_info = 10;</code> */ org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfoOrBuilder getFragmentRuntimeInfoOrBuilder(); + + // optional bool usingTezAm = 11 [default = true]; + /** + * <code>optional bool usingTezAm = 11 [default = true];</code> + */ + boolean hasUsingTezAm(); + /** + * <code>optional bool usingTezAm = 11 [default = true];</code> + */ + boolean getUsingTezAm(); } /** * Protobuf type {@code SubmitWorkRequestProto} @@ -7452,6 +7462,11 @@ public final class LlapDaemonProtocolProtos { bitField0_ |= 0x00000200; break; } + case 88: { + bitField0_ |= 0x00000400; + usingTezAm_ = input.readBool(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -7799,6 +7814,22 @@ public final class LlapDaemonProtocolProtos { return fragmentRuntimeInfo_; } + // optional bool usingTezAm = 11 [default = true]; + public static final int USINGTEZAM_FIELD_NUMBER = 11; + private boolean usingTezAm_; + /** + * <code>optional bool usingTezAm = 11 [default = true];</code> + */ + public boolean hasUsingTezAm() { + return ((bitField0_ & 0x00000400) == 0x00000400); + } + /** + * <code>optional bool usingTezAm = 11 [default = true];</code> + */ + public boolean getUsingTezAm() { + return usingTezAm_; + } + private void initFields() { containerIdString_ = ""; amHost_ = ""; @@ -7810,6 +7841,7 @@ public final class LlapDaemonProtocolProtos { appAttemptNumber_ = 0; fragmentSpec_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto.getDefaultInstance(); fragmentRuntimeInfo_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo.getDefaultInstance(); + usingTezAm_ = true; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -7853,6 +7885,9 @@ public final class LlapDaemonProtocolProtos { if (((bitField0_ & 0x00000200) == 0x00000200)) { output.writeMessage(10, fragmentRuntimeInfo_); } + if (((bitField0_ & 0x00000400) == 0x00000400)) { + output.writeBool(11, usingTezAm_); + } getUnknownFields().writeTo(output); } @@ -7902,6 +7937,10 @@ public final class LlapDaemonProtocolProtos { size += com.google.protobuf.CodedOutputStream .computeMessageSize(10, fragmentRuntimeInfo_); } + if (((bitField0_ & 0x00000400) == 0x00000400)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(11, usingTezAm_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -7975,6 +8014,11 @@ public final class LlapDaemonProtocolProtos { result = result && getFragmentRuntimeInfo() .equals(other.getFragmentRuntimeInfo()); } + result = result && (hasUsingTezAm() == other.hasUsingTezAm()); + if (hasUsingTezAm()) { + result = result && (getUsingTezAm() + == other.getUsingTezAm()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -8028,6 +8072,10 @@ public final class LlapDaemonProtocolProtos { hash = (37 * hash) + FRAGMENT_RUNTIME_INFO_FIELD_NUMBER; hash = (53 * hash) + getFragmentRuntimeInfo().hashCode(); } + if (hasUsingTezAm()) { + hash = (37 * hash) + USINGTEZAM_FIELD_NUMBER; + hash = (53 * hash) + hashBoolean(getUsingTezAm()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -8167,6 +8215,8 @@ public final class LlapDaemonProtocolProtos { fragmentRuntimeInfoBuilder_.clear(); } bitField0_ = (bitField0_ & ~0x00000200); + usingTezAm_ = true; + bitField0_ = (bitField0_ & ~0x00000400); return this; } @@ -8243,6 +8293,10 @@ public final class LlapDaemonProtocolProtos { } else { result.fragmentRuntimeInfo_ = fragmentRuntimeInfoBuilder_.build(); } + if (((from_bitField0_ & 0x00000400) == 0x00000400)) { + to_bitField0_ |= 0x00000400; + } + result.usingTezAm_ = usingTezAm_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -8299,6 +8353,9 @@ public final class LlapDaemonProtocolProtos { if (other.hasFragmentRuntimeInfo()) { mergeFragmentRuntimeInfo(other.getFragmentRuntimeInfo()); } + if (other.hasUsingTezAm()) { + setUsingTezAm(other.getUsingTezAm()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -9032,6 +9089,39 @@ public final class LlapDaemonProtocolProtos { return fragmentRuntimeInfoBuilder_; } + // optional bool usingTezAm = 11 [default = true]; + private boolean usingTezAm_ = true; + /** + * <code>optional bool usingTezAm = 11 [default = true];</code> + */ + public boolean hasUsingTezAm() { + return ((bitField0_ & 0x00000400) == 0x00000400); + } + /** + * <code>optional bool usingTezAm = 11 [default = true];</code> + */ + public boolean getUsingTezAm() { + return usingTezAm_; + } + /** + * <code>optional bool usingTezAm = 11 [default = true];</code> + */ + public Builder setUsingTezAm(boolean value) { + bitField0_ |= 0x00000400; + usingTezAm_ = value; + onChanged(); + return this; + } + /** + * <code>optional bool usingTezAm = 11 [default = true];</code> + */ + public Builder clearUsingTezAm() { + bitField0_ = (bitField0_ & ~0x00000400); + usingTezAm_ = true; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:SubmitWorkRequestProto) } @@ -14365,74 +14455,74 @@ public final class LlapDaemonProtocolProtos { descriptor; static { java.lang.String[] descriptorData = { - "\n1llap-common/src/protobuf/LlapDaemonPro" + - "tocol.proto\"9\n\020UserPayloadProto\022\024\n\014user_" + - "payload\030\001 \001(\014\022\017\n\007version\030\002 \001(\005\"j\n\025Entity" + - "DescriptorProto\022\022\n\nclass_name\030\001 \001(\t\022\'\n\014u" + - "ser_payload\030\002 \001(\0132\021.UserPayloadProto\022\024\n\014" + - "history_text\030\003 \001(\014\"x\n\013IOSpecProto\022\035\n\025con" + - "nected_vertex_name\030\001 \001(\t\022-\n\rio_descripto" + - "r\030\002 \001(\0132\026.EntityDescriptorProto\022\033\n\023physi" + - "cal_edge_count\030\003 \001(\005\"z\n\023GroupInputSpecPr" + - "oto\022\022\n\ngroup_name\030\001 \001(\t\022\026\n\016group_vertice", - "s\030\002 \003(\t\0227\n\027merged_input_descriptor\030\003 \001(\013" + - "2\026.EntityDescriptorProto\"\353\002\n\021FragmentSpe" + - "cProto\022\"\n\032fragment_identifier_string\030\001 \001" + - "(\t\022\020\n\010dag_name\030\002 \001(\t\022\016\n\006dag_id\030\013 \001(\005\022\023\n\013" + - "vertex_name\030\003 \001(\t\0224\n\024processor_descripto" + - "r\030\004 \001(\0132\026.EntityDescriptorProto\022!\n\013input" + - "_specs\030\005 \003(\0132\014.IOSpecProto\022\"\n\014output_spe" + - "cs\030\006 \003(\0132\014.IOSpecProto\0221\n\023grouped_input_" + - "specs\030\007 \003(\0132\024.GroupInputSpecProto\022\032\n\022ver" + - "tex_parallelism\030\010 \001(\005\022\027\n\017fragment_number", - "\030\t \001(\005\022\026\n\016attempt_number\030\n \001(\005\"\344\001\n\023Fragm" + - "entRuntimeInfo\022#\n\033num_self_and_upstream_" + - "tasks\030\001 \001(\005\022-\n%num_self_and_upstream_com" + - "pleted_tasks\030\002 \001(\005\022\033\n\023within_dag_priorit" + - "y\030\003 \001(\005\022\026\n\016dag_start_time\030\004 \001(\003\022 \n\030first" + - "_attempt_start_time\030\005 \001(\003\022\"\n\032current_att" + - "empt_start_time\030\006 \001(\003\"F\n\024QueryIdentifier" + - "Proto\022\026\n\016app_identifier\030\001 \001(\t\022\026\n\016dag_ide" + - "ntifier\030\002 \001(\005\"\266\002\n\026SubmitWorkRequestProto" + - "\022\033\n\023container_id_string\030\001 \001(\t\022\017\n\007am_host", - "\030\002 \001(\t\022\017\n\007am_port\030\003 \001(\005\022\030\n\020token_identif" + - "ier\030\004 \001(\t\022\032\n\022credentials_binary\030\005 \001(\014\022\014\n" + - "\004user\030\006 \001(\t\022\035\n\025application_id_string\030\007 \001" + - "(\t\022\032\n\022app_attempt_number\030\010 \001(\005\022)\n\rfragme" + - "nt_spec\030\t \001(\0132\022.FragmentSpecProto\0223\n\025fra" + - "gment_runtime_info\030\n \001(\0132\024.FragmentRunti" + - "meInfo\"J\n\027SubmitWorkResponseProto\022/\n\020sub" + - "mission_state\030\001 \001(\0162\025.SubmissionStatePro" + - "to\"\205\001\n\036SourceStateUpdatedRequestProto\022/\n" + - "\020query_identifier\030\001 \001(\0132\025.QueryIdentifie", - "rProto\022\020\n\010src_name\030\002 \001(\t\022 \n\005state\030\003 \001(\0162" + - "\021.SourceStateProto\"!\n\037SourceStateUpdated" + - "ResponseProto\"w\n\031QueryCompleteRequestPro" + - "to\022\020\n\010query_id\030\001 \001(\t\022/\n\020query_identifier" + - "\030\002 \001(\0132\025.QueryIdentifierProto\022\027\n\014delete_" + - "delay\030\004 \001(\003:\0010\"\034\n\032QueryCompleteResponseP" + - "roto\"t\n\035TerminateFragmentRequestProto\022/\n" + - "\020query_identifier\030\001 \001(\0132\025.QueryIdentifie" + - "rProto\022\"\n\032fragment_identifier_string\030\002 \001" + - "(\t\" \n\036TerminateFragmentResponseProto\"\026\n\024", - "GetTokenRequestProto\"&\n\025GetTokenResponse" + - "Proto\022\r\n\005token\030\001 \001(\014*2\n\020SourceStateProto" + - "\022\017\n\013S_SUCCEEDED\020\001\022\r\n\tS_RUNNING\020\002*E\n\024Subm" + - "issionStateProto\022\014\n\010ACCEPTED\020\001\022\014\n\010REJECT" + - "ED\020\002\022\021\n\rEVICTED_OTHER\020\0032\316\002\n\022LlapDaemonPr" + - "otocol\022?\n\nsubmitWork\022\027.SubmitWorkRequest" + - "Proto\032\030.SubmitWorkResponseProto\022W\n\022sourc" + - "eStateUpdated\022\037.SourceStateUpdatedReques" + - "tProto\032 .SourceStateUpdatedResponseProto" + - "\022H\n\rqueryComplete\022\032.QueryCompleteRequest", - "Proto\032\033.QueryCompleteResponseProto\022T\n\021te" + - "rminateFragment\022\036.TerminateFragmentReque" + - "stProto\032\037.TerminateFragmentResponseProto" + - "2]\n\026LlapManagementProtocol\022C\n\022getDelegat" + - "ionToken\022\025.GetTokenRequestProto\032\026.GetTok" + - "enResponseProtoBH\n&org.apache.hadoop.hiv" + - "e.llap.daemon.rpcB\030LlapDaemonProtocolPro" + - "tos\210\001\001\240\001\001" + "\n\030LlapDaemonProtocol.proto\"9\n\020UserPayloa" + + "dProto\022\024\n\014user_payload\030\001 \001(\014\022\017\n\007version\030" + + "\002 \001(\005\"j\n\025EntityDescriptorProto\022\022\n\nclass_" + + "name\030\001 \001(\t\022\'\n\014user_payload\030\002 \001(\0132\021.UserP" + + "ayloadProto\022\024\n\014history_text\030\003 \001(\014\"x\n\013IOS" + + "pecProto\022\035\n\025connected_vertex_name\030\001 \001(\t\022" + + "-\n\rio_descriptor\030\002 \001(\0132\026.EntityDescripto" + + "rProto\022\033\n\023physical_edge_count\030\003 \001(\005\"z\n\023G" + + "roupInputSpecProto\022\022\n\ngroup_name\030\001 \001(\t\022\026" + + "\n\016group_vertices\030\002 \003(\t\0227\n\027merged_input_d", + "escriptor\030\003 \001(\0132\026.EntityDescriptorProto\"" + + "\353\002\n\021FragmentSpecProto\022\"\n\032fragment_identi" + + "fier_string\030\001 \001(\t\022\020\n\010dag_name\030\002 \001(\t\022\016\n\006d" + + "ag_id\030\013 \001(\005\022\023\n\013vertex_name\030\003 \001(\t\0224\n\024proc" + + "essor_descriptor\030\004 \001(\0132\026.EntityDescripto" + + "rProto\022!\n\013input_specs\030\005 \003(\0132\014.IOSpecProt" + + "o\022\"\n\014output_specs\030\006 \003(\0132\014.IOSpecProto\0221\n" + + "\023grouped_input_specs\030\007 \003(\0132\024.GroupInputS" + + "pecProto\022\032\n\022vertex_parallelism\030\010 \001(\005\022\027\n\017" + + "fragment_number\030\t \001(\005\022\026\n\016attempt_number\030", + "\n \001(\005\"\344\001\n\023FragmentRuntimeInfo\022#\n\033num_sel" + + "f_and_upstream_tasks\030\001 \001(\005\022-\n%num_self_a" + + "nd_upstream_completed_tasks\030\002 \001(\005\022\033\n\023wit" + + "hin_dag_priority\030\003 \001(\005\022\026\n\016dag_start_time" + + "\030\004 \001(\003\022 \n\030first_attempt_start_time\030\005 \001(\003" + + "\022\"\n\032current_attempt_start_time\030\006 \001(\003\"F\n\024" + + "QueryIdentifierProto\022\026\n\016app_identifier\030\001" + + " \001(\t\022\026\n\016dag_identifier\030\002 \001(\005\"\320\002\n\026SubmitW" + + "orkRequestProto\022\033\n\023container_id_string\030\001" + + " \001(\t\022\017\n\007am_host\030\002 \001(\t\022\017\n\007am_port\030\003 \001(\005\022\030", + "\n\020token_identifier\030\004 \001(\t\022\032\n\022credentials_" + + "binary\030\005 \001(\014\022\014\n\004user\030\006 \001(\t\022\035\n\025applicatio" + + "n_id_string\030\007 \001(\t\022\032\n\022app_attempt_number\030" + + "\010 \001(\005\022)\n\rfragment_spec\030\t \001(\0132\022.FragmentS" + + "pecProto\0223\n\025fragment_runtime_info\030\n \001(\0132" + + "\024.FragmentRuntimeInfo\022\030\n\nusingTezAm\030\013 \001(" + + "\010:\004true\"J\n\027SubmitWorkResponseProto\022/\n\020su" + + "bmission_state\030\001 \001(\0162\025.SubmissionStatePr" + + "oto\"\205\001\n\036SourceStateUpdatedRequestProto\022/" + + "\n\020query_identifier\030\001 \001(\0132\025.QueryIdentifi", + "erProto\022\020\n\010src_name\030\002 \001(\t\022 \n\005state\030\003 \001(\016" + + "2\021.SourceStateProto\"!\n\037SourceStateUpdate" + + "dResponseProto\"w\n\031QueryCompleteRequestPr" + + "oto\022\020\n\010query_id\030\001 \001(\t\022/\n\020query_identifie" + + "r\030\002 \001(\0132\025.QueryIdentifierProto\022\027\n\014delete" + + "_delay\030\004 \001(\003:\0010\"\034\n\032QueryCompleteResponse" + + "Proto\"t\n\035TerminateFragmentRequestProto\022/" + + "\n\020query_identifier\030\001 \001(\0132\025.QueryIdentifi" + + "erProto\022\"\n\032fragment_identifier_string\030\002 " + + "\001(\t\" \n\036TerminateFragmentResponseProto\"\026\n", + "\024GetTokenRequestProto\"&\n\025GetTokenRespons" + + "eProto\022\r\n\005token\030\001 \001(\014*2\n\020SourceStateProt" + + "o\022\017\n\013S_SUCCEEDED\020\001\022\r\n\tS_RUNNING\020\002*E\n\024Sub" + + "missionStateProto\022\014\n\010ACCEPTED\020\001\022\014\n\010REJEC" + + "TED\020\002\022\021\n\rEVICTED_OTHER\020\0032\316\002\n\022LlapDaemonP" + + "rotocol\022?\n\nsubmitWork\022\027.SubmitWorkReques" + + "tProto\032\030.SubmitWorkResponseProto\022W\n\022sour" + + "ceStateUpdated\022\037.SourceStateUpdatedReque" + + "stProto\032 .SourceStateUpdatedResponseProt" + + "o\022H\n\rqueryComplete\022\032.QueryCompleteReques", + "tProto\032\033.QueryCompleteResponseProto\022T\n\021t" + + "erminateFragment\022\036.TerminateFragmentRequ" + + "estProto\032\037.TerminateFragmentResponseProt" + + "o2]\n\026LlapManagementProtocol\022C\n\022getDelega" + + "tionToken\022\025.GetTokenRequestProto\032\026.GetTo" + + "kenResponseProtoBH\n&org.apache.hadoop.hi" + + "ve.llap.daemon.rpcB\030LlapDaemonProtocolPr" + + "otos\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -14486,7 +14576,7 @@ public final class LlapDaemonProtocolProtos { internal_static_SubmitWorkRequestProto_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_SubmitWorkRequestProto_descriptor, - new java.lang.String[] { "ContainerIdString", "AmHost", "AmPort", "TokenIdentifier", "CredentialsBinary", "User", "ApplicationIdString", "AppAttemptNumber", "FragmentSpec", "FragmentRuntimeInfo", }); + new java.lang.String[] { "ContainerIdString", "AmHost", "AmPort", "TokenIdentifier", "CredentialsBinary", "User", "ApplicationIdString", "AppAttemptNumber", "FragmentSpec", "FragmentRuntimeInfo", "UsingTezAm", }); internal_static_SubmitWorkResponseProto_descriptor = getDescriptor().getMessageTypes().get(8); internal_static_SubmitWorkResponseProto_fieldAccessorTable = new http://git-wip-us.apache.org/repos/asf/hive/blob/342668f9/llap-common/src/protobuf/LlapDaemonProtocol.proto ---------------------------------------------------------------------- diff --git a/llap-common/src/protobuf/LlapDaemonProtocol.proto b/llap-common/src/protobuf/LlapDaemonProtocol.proto index 944c96c..e964c5f 100644 --- a/llap-common/src/protobuf/LlapDaemonProtocol.proto +++ b/llap-common/src/protobuf/LlapDaemonProtocol.proto @@ -91,6 +91,7 @@ message SubmitWorkRequestProto { optional int32 app_attempt_number = 8; optional FragmentSpecProto fragment_spec = 9; optional FragmentRuntimeInfo fragment_runtime_info = 10; + optional bool usingTezAm = 11 [default = true]; } enum SubmissionStateProto { @@ -136,11 +137,18 @@ message GetTokenResponseProto { optional bytes token = 1; } +message SendEventsRequestProto { +} + +message SendEventsResponseProto { +} + service LlapDaemonProtocol { rpc submitWork(SubmitWorkRequestProto) returns (SubmitWorkResponseProto); rpc sourceStateUpdated(SourceStateUpdatedRequestProto) returns (SourceStateUpdatedResponseProto); rpc queryComplete(QueryCompleteRequestProto) returns (QueryCompleteResponseProto); rpc terminateFragment(TerminateFragmentRequestProto) returns (TerminateFragmentResponseProto); + rpc sendEvents(SendEventsRequestProto) return (SendEventsResponseProto); } service LlapManagementProtocol { http://git-wip-us.apache.org/repos/asf/hive/blob/342668f9/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java ---------------------------------------------------------------------- diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java index 8db2f88..10d14c0 100644 --- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java +++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java @@ -346,6 +346,7 @@ public class LlapBaseInputFormat<V extends WritableComparable> implements InputF runtimeInfo.setNumSelfAndUpstreamCompletedTasks(0); + builder.setUsingTezAm(false); builder.setFragmentRuntimeInfo(runtimeInfo.build()); return builder.build(); } http://git-wip-us.apache.org/repos/asf/hive/blob/342668f9/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index 4a33373..efd6f0a 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -108,6 +108,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { private final String queryId; private final HadoopShim tezHadoopShim; private boolean shouldRunTask = true; + private final boolean withTezAm; final Stopwatch runtimeWatch = new Stopwatch(); final Stopwatch killtimerWatch = new Stopwatch(); private final AtomicBoolean isStarted = new AtomicBoolean(false); @@ -136,6 +137,8 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { this.jobToken = TokenCache.getSessionToken(credentials); this.taskSpec = Converters.getTaskSpecfromProto(request.getFragmentSpec()); this.amReporter = amReporter; + this.withTezAm = request.getUsingTezAm(); + LOG.warn("ZZZ: DBG: usingTezAm=" + withTezAm); // Register with the AMReporter when the callable is setup. Unregister once it starts running. this.amReporter.registerTask(request.getAmHost(), request.getAmPort(), request.getUser(), jobToken, fragmentInfo.getQueryInfo().getQueryIdentifier());
