HIVE-10846. LLAP: preemption in AM due to failures / out of order scheduling. (Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b8b94f29 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b8b94f29 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b8b94f29 Branch: refs/heads/llap Commit: b8b94f2979b96f97b8cfe98745a8e016ee6333b5 Parents: 2024c96 Author: Siddharth Seth <ss...@apache.org> Authored: Thu May 28 02:02:17 2015 -0700 Committer: Siddharth Seth <ss...@apache.org> Committed: Thu May 28 02:02:17 2015 -0700 ---------------------------------------------------------------------- .../daemon/rpc/LlapDaemonProtocolProtos.java | 599 +++++-------------- .../hive/llap/daemon/impl/AMReporter.java | 3 +- .../llap/daemon/impl/ContainerRunnerImpl.java | 24 +- .../hadoop/hive/llap/daemon/impl/Scheduler.java | 6 + .../llap/daemon/impl/TaskExecutorService.java | 148 +++-- .../llap/daemon/impl/TaskRunnerCallable.java | 30 +- .../hadoop/hive/llap/tezplugins/Converters.java | 4 +- .../llap/tezplugins/LlapTaskCommunicator.java | 106 +++- .../hive/llap/tezplugins/TaskCommunicator.java | 80 +-- .../dag/app/rm/LlapTaskSchedulerService.java | 268 ++++++++- .../src/protobuf/LlapDaemonProtocol.proto | 7 +- .../daemon/impl/TestTaskExecutorService.java | 82 +-- .../app/rm/TestLlapTaskSchedulerService.java | 74 ++- 13 files changed, 813 insertions(+), 618 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/b8b94f29/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java ---------------------------------------------------------------------- diff --git a/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java b/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java index d378955..b044df9 100644 --- a/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java +++ b/llap-server/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java @@ -3124,20 +3124,20 @@ public final class LlapDaemonProtocolProtos { public interface FragmentSpecProtoOrBuilder extends com.google.protobuf.MessageOrBuilder { - // optional string task_attempt_id_string = 1; + // optional string fragment_identifier_string = 1; /** - * <code>optional string task_attempt_id_string = 1;</code> + * <code>optional string fragment_identifier_string = 1;</code> */ - boolean hasTaskAttemptIdString(); + boolean hasFragmentIdentifierString(); /** - * <code>optional string task_attempt_id_string = 1;</code> + * <code>optional string fragment_identifier_string = 1;</code> */ - java.lang.String getTaskAttemptIdString(); + java.lang.String getFragmentIdentifierString(); /** - * <code>optional string task_attempt_id_string = 1;</code> + * <code>optional string fragment_identifier_string = 1;</code> */ com.google.protobuf.ByteString - getTaskAttemptIdStringBytes(); + getFragmentIdentifierStringBytes(); // optional string dag_name = 2; /** @@ -3341,7 +3341,7 @@ public final class LlapDaemonProtocolProtos { } case 10: { bitField0_ |= 0x00000001; - taskAttemptIdString_ = input.readBytes(); + fragmentIdentifierString_ = input.readBytes(); break; } case 18: { @@ -3455,20 +3455,20 @@ public final class LlapDaemonProtocolProtos { } private int bitField0_; - // optional string task_attempt_id_string = 1; - public static final int TASK_ATTEMPT_ID_STRING_FIELD_NUMBER = 1; - private java.lang.Object taskAttemptIdString_; + // optional string fragment_identifier_string = 1; + public static final int FRAGMENT_IDENTIFIER_STRING_FIELD_NUMBER = 1; + private java.lang.Object fragmentIdentifierString_; /** - * <code>optional string task_attempt_id_string = 1;</code> + * <code>optional string fragment_identifier_string = 1;</code> */ - public boolean hasTaskAttemptIdString() { + public boolean hasFragmentIdentifierString() { return ((bitField0_ & 0x00000001) == 0x00000001); } /** - * <code>optional string task_attempt_id_string = 1;</code> + * <code>optional string fragment_identifier_string = 1;</code> */ - public java.lang.String getTaskAttemptIdString() { - java.lang.Object ref = taskAttemptIdString_; + public java.lang.String getFragmentIdentifierString() { + java.lang.Object ref = fragmentIdentifierString_; if (ref instanceof java.lang.String) { return (java.lang.String) ref; } else { @@ -3476,22 +3476,22 @@ public final class LlapDaemonProtocolProtos { (com.google.protobuf.ByteString) ref; java.lang.String s = bs.toStringUtf8(); if (bs.isValidUtf8()) { - taskAttemptIdString_ = s; + fragmentIdentifierString_ = s; } return s; } } /** - * <code>optional string task_attempt_id_string = 1;</code> + * <code>optional string fragment_identifier_string = 1;</code> */ public com.google.protobuf.ByteString - getTaskAttemptIdStringBytes() { - java.lang.Object ref = taskAttemptIdString_; + getFragmentIdentifierStringBytes() { + java.lang.Object ref = fragmentIdentifierString_; if (ref instanceof java.lang.String) { com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); - taskAttemptIdString_ = b; + fragmentIdentifierString_ = b; return b; } else { return (com.google.protobuf.ByteString) ref; @@ -3763,7 +3763,7 @@ public final class LlapDaemonProtocolProtos { } private void initFields() { - taskAttemptIdString_ = ""; + fragmentIdentifierString_ = ""; dagName_ = ""; vertexName_ = ""; processorDescriptor_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto.getDefaultInstance(); @@ -3787,7 +3787,7 @@ public final class LlapDaemonProtocolProtos { throws java.io.IOException { getSerializedSize(); if (((bitField0_ & 0x00000001) == 0x00000001)) { - output.writeBytes(1, getTaskAttemptIdStringBytes()); + output.writeBytes(1, getFragmentIdentifierStringBytes()); } if (((bitField0_ & 0x00000002) == 0x00000002)) { output.writeBytes(2, getDagNameBytes()); @@ -3827,7 +3827,7 @@ public final class LlapDaemonProtocolProtos { size = 0; if (((bitField0_ & 0x00000001) == 0x00000001)) { size += com.google.protobuf.CodedOutputStream - .computeBytesSize(1, getTaskAttemptIdStringBytes()); + .computeBytesSize(1, getFragmentIdentifierStringBytes()); } if (((bitField0_ & 0x00000002) == 0x00000002)) { size += com.google.protobuf.CodedOutputStream @@ -3888,10 +3888,10 @@ public final class LlapDaemonProtocolProtos { org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto other = (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto) obj; boolean result = true; - result = result && (hasTaskAttemptIdString() == other.hasTaskAttemptIdString()); - if (hasTaskAttemptIdString()) { - result = result && getTaskAttemptIdString() - .equals(other.getTaskAttemptIdString()); + result = result && (hasFragmentIdentifierString() == other.hasFragmentIdentifierString()); + if (hasFragmentIdentifierString()) { + result = result && getFragmentIdentifierString() + .equals(other.getFragmentIdentifierString()); } result = result && (hasDagName() == other.hasDagName()); if (hasDagName()) { @@ -3942,9 +3942,9 @@ public final class LlapDaemonProtocolProtos { } int hash = 41; hash = (19 * hash) + getDescriptorForType().hashCode(); - if (hasTaskAttemptIdString()) { - hash = (37 * hash) + TASK_ATTEMPT_ID_STRING_FIELD_NUMBER; - hash = (53 * hash) + getTaskAttemptIdString().hashCode(); + if (hasFragmentIdentifierString()) { + hash = (37 * hash) + FRAGMENT_IDENTIFIER_STRING_FIELD_NUMBER; + hash = (53 * hash) + getFragmentIdentifierString().hashCode(); } if (hasDagName()) { hash = (37 * hash) + DAG_NAME_FIELD_NUMBER; @@ -4095,7 +4095,7 @@ public final class LlapDaemonProtocolProtos { public Builder clear() { super.clear(); - taskAttemptIdString_ = ""; + fragmentIdentifierString_ = ""; bitField0_ = (bitField0_ & ~0x00000001); dagName_ = ""; bitField0_ = (bitField0_ & ~0x00000002); @@ -4162,7 +4162,7 @@ public final class LlapDaemonProtocolProtos { if (((from_bitField0_ & 0x00000001) == 0x00000001)) { to_bitField0_ |= 0x00000001; } - result.taskAttemptIdString_ = taskAttemptIdString_; + result.fragmentIdentifierString_ = fragmentIdentifierString_; if (((from_bitField0_ & 0x00000002) == 0x00000002)) { to_bitField0_ |= 0x00000002; } @@ -4234,9 +4234,9 @@ public final class LlapDaemonProtocolProtos { public Builder mergeFrom(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto other) { if (other == org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto.getDefaultInstance()) return this; - if (other.hasTaskAttemptIdString()) { + if (other.hasFragmentIdentifierString()) { bitField0_ |= 0x00000001; - taskAttemptIdString_ = other.taskAttemptIdString_; + fragmentIdentifierString_ = other.fragmentIdentifierString_; onChanged(); } if (other.hasDagName()) { @@ -4366,76 +4366,76 @@ public final class LlapDaemonProtocolProtos { } private int bitField0_; - // optional string task_attempt_id_string = 1; - private java.lang.Object taskAttemptIdString_ = ""; + // optional string fragment_identifier_string = 1; + private java.lang.Object fragmentIdentifierString_ = ""; /** - * <code>optional string task_attempt_id_string = 1;</code> + * <code>optional string fragment_identifier_string = 1;</code> */ - public boolean hasTaskAttemptIdString() { + public boolean hasFragmentIdentifierString() { return ((bitField0_ & 0x00000001) == 0x00000001); } /** - * <code>optional string task_attempt_id_string = 1;</code> + * <code>optional string fragment_identifier_string = 1;</code> */ - public java.lang.String getTaskAttemptIdString() { - java.lang.Object ref = taskAttemptIdString_; + public java.lang.String getFragmentIdentifierString() { + java.lang.Object ref = fragmentIdentifierString_; if (!(ref instanceof java.lang.String)) { java.lang.String s = ((com.google.protobuf.ByteString) ref) .toStringUtf8(); - taskAttemptIdString_ = s; + fragmentIdentifierString_ = s; return s; } else { return (java.lang.String) ref; } } /** - * <code>optional string task_attempt_id_string = 1;</code> + * <code>optional string fragment_identifier_string = 1;</code> */ public com.google.protobuf.ByteString - getTaskAttemptIdStringBytes() { - java.lang.Object ref = taskAttemptIdString_; + getFragmentIdentifierStringBytes() { + java.lang.Object ref = fragmentIdentifierString_; if (ref instanceof String) { com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); - taskAttemptIdString_ = b; + fragmentIdentifierString_ = b; return b; } else { return (com.google.protobuf.ByteString) ref; } } /** - * <code>optional string task_attempt_id_string = 1;</code> + * <code>optional string fragment_identifier_string = 1;</code> */ - public Builder setTaskAttemptIdString( + public Builder setFragmentIdentifierString( java.lang.String value) { if (value == null) { throw new NullPointerException(); } bitField0_ |= 0x00000001; - taskAttemptIdString_ = value; + fragmentIdentifierString_ = value; onChanged(); return this; } /** - * <code>optional string task_attempt_id_string = 1;</code> + * <code>optional string fragment_identifier_string = 1;</code> */ - public Builder clearTaskAttemptIdString() { + public Builder clearFragmentIdentifierString() { bitField0_ = (bitField0_ & ~0x00000001); - taskAttemptIdString_ = getDefaultInstance().getTaskAttemptIdString(); + fragmentIdentifierString_ = getDefaultInstance().getFragmentIdentifierString(); onChanged(); return this; } /** - * <code>optional string task_attempt_id_string = 1;</code> + * <code>optional string fragment_identifier_string = 1;</code> */ - public Builder setTaskAttemptIdStringBytes( + public Builder setFragmentIdentifierStringBytes( com.google.protobuf.ByteString value) { if (value == null) { throw new NullPointerException(); } bitField0_ |= 0x00000001; - taskAttemptIdString_ = value; + fragmentIdentifierString_ = value; onChanged(); return this; } @@ -10847,50 +10847,20 @@ public final class LlapDaemonProtocolProtos { com.google.protobuf.ByteString getDagNameBytes(); - // optional int32 dag_attempt_number = 3; + // optional string fragment_identifier_string = 7; /** - * <code>optional int32 dag_attempt_number = 3;</code> + * <code>optional string fragment_identifier_string = 7;</code> */ - boolean hasDagAttemptNumber(); + boolean hasFragmentIdentifierString(); /** - * <code>optional int32 dag_attempt_number = 3;</code> + * <code>optional string fragment_identifier_string = 7;</code> */ - int getDagAttemptNumber(); - - // optional string vertex_name = 4; - /** - * <code>optional string vertex_name = 4;</code> - */ - boolean hasVertexName(); + java.lang.String getFragmentIdentifierString(); /** - * <code>optional string vertex_name = 4;</code> - */ - java.lang.String getVertexName(); - /** - * <code>optional string vertex_name = 4;</code> + * <code>optional string fragment_identifier_string = 7;</code> */ com.google.protobuf.ByteString - getVertexNameBytes(); - - // optional int32 fragment_number = 5; - /** - * <code>optional int32 fragment_number = 5;</code> - */ - boolean hasFragmentNumber(); - /** - * <code>optional int32 fragment_number = 5;</code> - */ - int getFragmentNumber(); - - // optional int32 attempt_number = 6; - /** - * <code>optional int32 attempt_number = 6;</code> - */ - boolean hasAttemptNumber(); - /** - * <code>optional int32 attempt_number = 6;</code> - */ - int getAttemptNumber(); + getFragmentIdentifierStringBytes(); } /** * Protobuf type {@code TerminateFragmentRequestProto} @@ -10953,24 +10923,9 @@ public final class LlapDaemonProtocolProtos { dagName_ = input.readBytes(); break; } - case 24: { + case 58: { bitField0_ |= 0x00000004; - dagAttemptNumber_ = input.readInt32(); - break; - } - case 34: { - bitField0_ |= 0x00000008; - vertexName_ = input.readBytes(); - break; - } - case 40: { - bitField0_ |= 0x00000010; - fragmentNumber_ = input.readInt32(); - break; - } - case 48: { - bitField0_ |= 0x00000020; - attemptNumber_ = input.readInt32(); + fragmentIdentifierString_ = input.readBytes(); break; } } @@ -11099,36 +11054,20 @@ public final class LlapDaemonProtocolProtos { } } - // optional int32 dag_attempt_number = 3; - public static final int DAG_ATTEMPT_NUMBER_FIELD_NUMBER = 3; - private int dagAttemptNumber_; + // optional string fragment_identifier_string = 7; + public static final int FRAGMENT_IDENTIFIER_STRING_FIELD_NUMBER = 7; + private java.lang.Object fragmentIdentifierString_; /** - * <code>optional int32 dag_attempt_number = 3;</code> + * <code>optional string fragment_identifier_string = 7;</code> */ - public boolean hasDagAttemptNumber() { + public boolean hasFragmentIdentifierString() { return ((bitField0_ & 0x00000004) == 0x00000004); } /** - * <code>optional int32 dag_attempt_number = 3;</code> - */ - public int getDagAttemptNumber() { - return dagAttemptNumber_; - } - - // optional string vertex_name = 4; - public static final int VERTEX_NAME_FIELD_NUMBER = 4; - private java.lang.Object vertexName_; - /** - * <code>optional string vertex_name = 4;</code> + * <code>optional string fragment_identifier_string = 7;</code> */ - public boolean hasVertexName() { - return ((bitField0_ & 0x00000008) == 0x00000008); - } - /** - * <code>optional string vertex_name = 4;</code> - */ - public java.lang.String getVertexName() { - java.lang.Object ref = vertexName_; + public java.lang.String getFragmentIdentifierString() { + java.lang.Object ref = fragmentIdentifierString_; if (ref instanceof java.lang.String) { return (java.lang.String) ref; } else { @@ -11136,67 +11075,32 @@ public final class LlapDaemonProtocolProtos { (com.google.protobuf.ByteString) ref; java.lang.String s = bs.toStringUtf8(); if (bs.isValidUtf8()) { - vertexName_ = s; + fragmentIdentifierString_ = s; } return s; } } /** - * <code>optional string vertex_name = 4;</code> + * <code>optional string fragment_identifier_string = 7;</code> */ public com.google.protobuf.ByteString - getVertexNameBytes() { - java.lang.Object ref = vertexName_; + getFragmentIdentifierStringBytes() { + java.lang.Object ref = fragmentIdentifierString_; if (ref instanceof java.lang.String) { com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); - vertexName_ = b; + fragmentIdentifierString_ = b; return b; } else { return (com.google.protobuf.ByteString) ref; } } - // optional int32 fragment_number = 5; - public static final int FRAGMENT_NUMBER_FIELD_NUMBER = 5; - private int fragmentNumber_; - /** - * <code>optional int32 fragment_number = 5;</code> - */ - public boolean hasFragmentNumber() { - return ((bitField0_ & 0x00000010) == 0x00000010); - } - /** - * <code>optional int32 fragment_number = 5;</code> - */ - public int getFragmentNumber() { - return fragmentNumber_; - } - - // optional int32 attempt_number = 6; - public static final int ATTEMPT_NUMBER_FIELD_NUMBER = 6; - private int attemptNumber_; - /** - * <code>optional int32 attempt_number = 6;</code> - */ - public boolean hasAttemptNumber() { - return ((bitField0_ & 0x00000020) == 0x00000020); - } - /** - * <code>optional int32 attempt_number = 6;</code> - */ - public int getAttemptNumber() { - return attemptNumber_; - } - private void initFields() { queryId_ = ""; dagName_ = ""; - dagAttemptNumber_ = 0; - vertexName_ = ""; - fragmentNumber_ = 0; - attemptNumber_ = 0; + fragmentIdentifierString_ = ""; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -11217,16 +11121,7 @@ public final class LlapDaemonProtocolProtos { output.writeBytes(2, getDagNameBytes()); } if (((bitField0_ & 0x00000004) == 0x00000004)) { - output.writeInt32(3, dagAttemptNumber_); - } - if (((bitField0_ & 0x00000008) == 0x00000008)) { - output.writeBytes(4, getVertexNameBytes()); - } - if (((bitField0_ & 0x00000010) == 0x00000010)) { - output.writeInt32(5, fragmentNumber_); - } - if (((bitField0_ & 0x00000020) == 0x00000020)) { - output.writeInt32(6, attemptNumber_); + output.writeBytes(7, getFragmentIdentifierStringBytes()); } getUnknownFields().writeTo(output); } @@ -11247,19 +11142,7 @@ public final class LlapDaemonProtocolProtos { } if (((bitField0_ & 0x00000004) == 0x00000004)) { size += com.google.protobuf.CodedOutputStream - .computeInt32Size(3, dagAttemptNumber_); - } - if (((bitField0_ & 0x00000008) == 0x00000008)) { - size += com.google.protobuf.CodedOutputStream - .computeBytesSize(4, getVertexNameBytes()); - } - if (((bitField0_ & 0x00000010) == 0x00000010)) { - size += com.google.protobuf.CodedOutputStream - .computeInt32Size(5, fragmentNumber_); - } - if (((bitField0_ & 0x00000020) == 0x00000020)) { - size += com.google.protobuf.CodedOutputStream - .computeInt32Size(6, attemptNumber_); + .computeBytesSize(7, getFragmentIdentifierStringBytes()); } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; @@ -11294,25 +11177,10 @@ public final class LlapDaemonProtocolProtos { result = result && getDagName() .equals(other.getDagName()); } - result = result && (hasDagAttemptNumber() == other.hasDagAttemptNumber()); - if (hasDagAttemptNumber()) { - result = result && (getDagAttemptNumber() - == other.getDagAttemptNumber()); - } - result = result && (hasVertexName() == other.hasVertexName()); - if (hasVertexName()) { - result = result && getVertexName() - .equals(other.getVertexName()); - } - result = result && (hasFragmentNumber() == other.hasFragmentNumber()); - if (hasFragmentNumber()) { - result = result && (getFragmentNumber() - == other.getFragmentNumber()); - } - result = result && (hasAttemptNumber() == other.hasAttemptNumber()); - if (hasAttemptNumber()) { - result = result && (getAttemptNumber() - == other.getAttemptNumber()); + result = result && (hasFragmentIdentifierString() == other.hasFragmentIdentifierString()); + if (hasFragmentIdentifierString()) { + result = result && getFragmentIdentifierString() + .equals(other.getFragmentIdentifierString()); } result = result && getUnknownFields().equals(other.getUnknownFields()); @@ -11335,21 +11203,9 @@ public final class LlapDaemonProtocolProtos { hash = (37 * hash) + DAG_NAME_FIELD_NUMBER; hash = (53 * hash) + getDagName().hashCode(); } - if (hasDagAttemptNumber()) { - hash = (37 * hash) + DAG_ATTEMPT_NUMBER_FIELD_NUMBER; - hash = (53 * hash) + getDagAttemptNumber(); - } - if (hasVertexName()) { - hash = (37 * hash) + VERTEX_NAME_FIELD_NUMBER; - hash = (53 * hash) + getVertexName().hashCode(); - } - if (hasFragmentNumber()) { - hash = (37 * hash) + FRAGMENT_NUMBER_FIELD_NUMBER; - hash = (53 * hash) + getFragmentNumber(); - } - if (hasAttemptNumber()) { - hash = (37 * hash) + ATTEMPT_NUMBER_FIELD_NUMBER; - hash = (53 * hash) + getAttemptNumber(); + if (hasFragmentIdentifierString()) { + hash = (37 * hash) + FRAGMENT_IDENTIFIER_STRING_FIELD_NUMBER; + hash = (53 * hash) + getFragmentIdentifierString().hashCode(); } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; @@ -11464,14 +11320,8 @@ public final class LlapDaemonProtocolProtos { bitField0_ = (bitField0_ & ~0x00000001); dagName_ = ""; bitField0_ = (bitField0_ & ~0x00000002); - dagAttemptNumber_ = 0; + fragmentIdentifierString_ = ""; bitField0_ = (bitField0_ & ~0x00000004); - vertexName_ = ""; - bitField0_ = (bitField0_ & ~0x00000008); - fragmentNumber_ = 0; - bitField0_ = (bitField0_ & ~0x00000010); - attemptNumber_ = 0; - bitField0_ = (bitField0_ & ~0x00000020); return this; } @@ -11511,19 +11361,7 @@ public final class LlapDaemonProtocolProtos { if (((from_bitField0_ & 0x00000004) == 0x00000004)) { to_bitField0_ |= 0x00000004; } - result.dagAttemptNumber_ = dagAttemptNumber_; - if (((from_bitField0_ & 0x00000008) == 0x00000008)) { - to_bitField0_ |= 0x00000008; - } - result.vertexName_ = vertexName_; - if (((from_bitField0_ & 0x00000010) == 0x00000010)) { - to_bitField0_ |= 0x00000010; - } - result.fragmentNumber_ = fragmentNumber_; - if (((from_bitField0_ & 0x00000020) == 0x00000020)) { - to_bitField0_ |= 0x00000020; - } - result.attemptNumber_ = attemptNumber_; + result.fragmentIdentifierString_ = fragmentIdentifierString_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -11550,20 +11388,11 @@ public final class LlapDaemonProtocolProtos { dagName_ = other.dagName_; onChanged(); } - if (other.hasDagAttemptNumber()) { - setDagAttemptNumber(other.getDagAttemptNumber()); - } - if (other.hasVertexName()) { - bitField0_ |= 0x00000008; - vertexName_ = other.vertexName_; + if (other.hasFragmentIdentifierString()) { + bitField0_ |= 0x00000004; + fragmentIdentifierString_ = other.fragmentIdentifierString_; onChanged(); } - if (other.hasFragmentNumber()) { - setFragmentNumber(other.getFragmentNumber()); - } - if (other.hasAttemptNumber()) { - setAttemptNumber(other.getAttemptNumber()); - } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -11739,175 +11568,76 @@ public final class LlapDaemonProtocolProtos { return this; } - // optional int32 dag_attempt_number = 3; - private int dagAttemptNumber_ ; + // optional string fragment_identifier_string = 7; + private java.lang.Object fragmentIdentifierString_ = ""; /** - * <code>optional int32 dag_attempt_number = 3;</code> + * <code>optional string fragment_identifier_string = 7;</code> */ - public boolean hasDagAttemptNumber() { + public boolean hasFragmentIdentifierString() { return ((bitField0_ & 0x00000004) == 0x00000004); } /** - * <code>optional int32 dag_attempt_number = 3;</code> - */ - public int getDagAttemptNumber() { - return dagAttemptNumber_; - } - /** - * <code>optional int32 dag_attempt_number = 3;</code> + * <code>optional string fragment_identifier_string = 7;</code> */ - public Builder setDagAttemptNumber(int value) { - bitField0_ |= 0x00000004; - dagAttemptNumber_ = value; - onChanged(); - return this; - } - /** - * <code>optional int32 dag_attempt_number = 3;</code> - */ - public Builder clearDagAttemptNumber() { - bitField0_ = (bitField0_ & ~0x00000004); - dagAttemptNumber_ = 0; - onChanged(); - return this; - } - - // optional string vertex_name = 4; - private java.lang.Object vertexName_ = ""; - /** - * <code>optional string vertex_name = 4;</code> - */ - public boolean hasVertexName() { - return ((bitField0_ & 0x00000008) == 0x00000008); - } - /** - * <code>optional string vertex_name = 4;</code> - */ - public java.lang.String getVertexName() { - java.lang.Object ref = vertexName_; + public java.lang.String getFragmentIdentifierString() { + java.lang.Object ref = fragmentIdentifierString_; if (!(ref instanceof java.lang.String)) { java.lang.String s = ((com.google.protobuf.ByteString) ref) .toStringUtf8(); - vertexName_ = s; + fragmentIdentifierString_ = s; return s; } else { return (java.lang.String) ref; } } /** - * <code>optional string vertex_name = 4;</code> + * <code>optional string fragment_identifier_string = 7;</code> */ public com.google.protobuf.ByteString - getVertexNameBytes() { - java.lang.Object ref = vertexName_; + getFragmentIdentifierStringBytes() { + java.lang.Object ref = fragmentIdentifierString_; if (ref instanceof String) { com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); - vertexName_ = b; + fragmentIdentifierString_ = b; return b; } else { return (com.google.protobuf.ByteString) ref; } } /** - * <code>optional string vertex_name = 4;</code> + * <code>optional string fragment_identifier_string = 7;</code> */ - public Builder setVertexName( + public Builder setFragmentIdentifierString( java.lang.String value) { if (value == null) { throw new NullPointerException(); } - bitField0_ |= 0x00000008; - vertexName_ = value; + bitField0_ |= 0x00000004; + fragmentIdentifierString_ = value; onChanged(); return this; } /** - * <code>optional string vertex_name = 4;</code> + * <code>optional string fragment_identifier_string = 7;</code> */ - public Builder clearVertexName() { - bitField0_ = (bitField0_ & ~0x00000008); - vertexName_ = getDefaultInstance().getVertexName(); + public Builder clearFragmentIdentifierString() { + bitField0_ = (bitField0_ & ~0x00000004); + fragmentIdentifierString_ = getDefaultInstance().getFragmentIdentifierString(); onChanged(); return this; } /** - * <code>optional string vertex_name = 4;</code> + * <code>optional string fragment_identifier_string = 7;</code> */ - public Builder setVertexNameBytes( + public Builder setFragmentIdentifierStringBytes( com.google.protobuf.ByteString value) { if (value == null) { throw new NullPointerException(); } - bitField0_ |= 0x00000008; - vertexName_ = value; - onChanged(); - return this; - } - - // optional int32 fragment_number = 5; - private int fragmentNumber_ ; - /** - * <code>optional int32 fragment_number = 5;</code> - */ - public boolean hasFragmentNumber() { - return ((bitField0_ & 0x00000010) == 0x00000010); - } - /** - * <code>optional int32 fragment_number = 5;</code> - */ - public int getFragmentNumber() { - return fragmentNumber_; - } - /** - * <code>optional int32 fragment_number = 5;</code> - */ - public Builder setFragmentNumber(int value) { - bitField0_ |= 0x00000010; - fragmentNumber_ = value; - onChanged(); - return this; - } - /** - * <code>optional int32 fragment_number = 5;</code> - */ - public Builder clearFragmentNumber() { - bitField0_ = (bitField0_ & ~0x00000010); - fragmentNumber_ = 0; - onChanged(); - return this; - } - - // optional int32 attempt_number = 6; - private int attemptNumber_ ; - /** - * <code>optional int32 attempt_number = 6;</code> - */ - public boolean hasAttemptNumber() { - return ((bitField0_ & 0x00000020) == 0x00000020); - } - /** - * <code>optional int32 attempt_number = 6;</code> - */ - public int getAttemptNumber() { - return attemptNumber_; - } - /** - * <code>optional int32 attempt_number = 6;</code> - */ - public Builder setAttemptNumber(int value) { - bitField0_ |= 0x00000020; - attemptNumber_ = value; - onChanged(); - return this; - } - /** - * <code>optional int32 attempt_number = 6;</code> - */ - public Builder clearAttemptNumber() { - bitField0_ = (bitField0_ & ~0x00000020); - attemptNumber_ = 0; + bitField0_ |= 0x00000004; + fragmentIdentifierString_ = value; onChanged(); return this; } @@ -12796,52 +12526,51 @@ public final class LlapDaemonProtocolProtos { "roupInputSpecProto\022\022\n\ngroup_name\030\001 \001(\t\022\026" + "\n\016group_vertices\030\002 \003(\t\0227\n\027merged_input_d", "escriptor\030\003 \001(\0132\026.EntityDescriptorProto\"" + - "\327\002\n\021FragmentSpecProto\022\036\n\026task_attempt_id" + - "_string\030\001 \001(\t\022\020\n\010dag_name\030\002 \001(\t\022\023\n\013verte" + - "x_name\030\003 \001(\t\0224\n\024processor_descriptor\030\004 \001" + - "(\0132\026.EntityDescriptorProto\022!\n\013input_spec" + - "s\030\005 \003(\0132\014.IOSpecProto\022\"\n\014output_specs\030\006 " + - "\003(\0132\014.IOSpecProto\0221\n\023grouped_input_specs" + - "\030\007 \003(\0132\024.GroupInputSpecProto\022\032\n\022vertex_p" + - "arallelism\030\010 \001(\005\022\027\n\017fragment_number\030\t \001(" + - "\005\022\026\n\016attempt_number\030\n \001(\005\"\344\001\n\023FragmentRu", - "ntimeInfo\022#\n\033num_self_and_upstream_tasks" + - "\030\001 \001(\005\022-\n%num_self_and_upstream_complete" + - "d_tasks\030\002 \001(\005\022\033\n\023within_dag_priority\030\003 \001" + - "(\005\022\026\n\016dag_start_time\030\004 \001(\003\022 \n\030first_atte" + - "mpt_start_time\030\005 \001(\003\022\"\n\032current_attempt_" + - "start_time\030\006 \001(\003\"\266\002\n\026SubmitWorkRequestPr" + - "oto\022\033\n\023container_id_string\030\001 \001(\t\022\017\n\007am_h" + - "ost\030\002 \001(\t\022\017\n\007am_port\030\003 \001(\005\022\030\n\020token_iden" + - "tifier\030\004 \001(\t\022\032\n\022credentials_binary\030\005 \001(\014" + - "\022\014\n\004user\030\006 \001(\t\022\035\n\025application_id_string\030", - "\007 \001(\t\022\032\n\022app_attempt_number\030\010 \001(\005\022)\n\rfra" + - "gment_spec\030\t \001(\0132\022.FragmentSpecProto\0223\n\025" + - "fragment_runtime_info\030\n \001(\0132\024.FragmentRu" + - "ntimeInfo\"\031\n\027SubmitWorkResponseProto\"f\n\036" + - "SourceStateUpdatedRequestProto\022\020\n\010dag_na" + - "me\030\001 \001(\t\022\020\n\010src_name\030\002 \001(\t\022 \n\005state\030\003 \001(" + - "\0162\021.SourceStateProto\"!\n\037SourceStateUpdat" + - "edResponseProto\"X\n\031QueryCompleteRequestP" + - "roto\022\020\n\010query_id\030\001 \001(\t\022\020\n\010dag_name\030\002 \001(\t" + - "\022\027\n\014delete_delay\030\003 \001(\003:\0010\"\034\n\032QueryComple", - "teResponseProto\"\245\001\n\035TerminateFragmentReq" + - "uestProto\022\020\n\010query_id\030\001 \001(\t\022\020\n\010dag_name\030" + - "\002 \001(\t\022\032\n\022dag_attempt_number\030\003 \001(\005\022\023\n\013ver" + - "tex_name\030\004 \001(\t\022\027\n\017fragment_number\030\005 \001(\005\022" + - "\026\n\016attempt_number\030\006 \001(\005\" \n\036TerminateFrag" + - "mentResponseProto*2\n\020SourceStateProto\022\017\n" + - "\013S_SUCCEEDED\020\001\022\r\n\tS_RUNNING\020\0022\316\002\n\022LlapDa" + - "emonProtocol\022?\n\nsubmitWork\022\027.SubmitWorkR" + - "equestProto\032\030.SubmitWorkResponseProto\022W\n" + - "\022sourceStateUpdated\022\037.SourceStateUpdated", - "RequestProto\032 .SourceStateUpdatedRespons" + - "eProto\022H\n\rqueryComplete\022\032.QueryCompleteR" + - "equestProto\032\033.QueryCompleteResponseProto" + - "\022T\n\021terminateFragment\022\036.TerminateFragmen" + - "tRequestProto\032\037.TerminateFragmentRespons" + - "eProtoBH\n&org.apache.hadoop.hive.llap.da" + - "emon.rpcB\030LlapDaemonProtocolProtos\210\001\001\240\001\001" + "\333\002\n\021FragmentSpecProto\022\"\n\032fragment_identi" + + "fier_string\030\001 \001(\t\022\020\n\010dag_name\030\002 \001(\t\022\023\n\013v" + + "ertex_name\030\003 \001(\t\0224\n\024processor_descriptor" + + "\030\004 \001(\0132\026.EntityDescriptorProto\022!\n\013input_" + + "specs\030\005 \003(\0132\014.IOSpecProto\022\"\n\014output_spec" + + "s\030\006 \003(\0132\014.IOSpecProto\0221\n\023grouped_input_s" + + "pecs\030\007 \003(\0132\024.GroupInputSpecProto\022\032\n\022vert" + + "ex_parallelism\030\010 \001(\005\022\027\n\017fragment_number\030" + + "\t \001(\005\022\026\n\016attempt_number\030\n \001(\005\"\344\001\n\023Fragme", + "ntRuntimeInfo\022#\n\033num_self_and_upstream_t" + + "asks\030\001 \001(\005\022-\n%num_self_and_upstream_comp" + + "leted_tasks\030\002 \001(\005\022\033\n\023within_dag_priority" + + "\030\003 \001(\005\022\026\n\016dag_start_time\030\004 \001(\003\022 \n\030first_" + + "attempt_start_time\030\005 \001(\003\022\"\n\032current_atte" + + "mpt_start_time\030\006 \001(\003\"\266\002\n\026SubmitWorkReque" + + "stProto\022\033\n\023container_id_string\030\001 \001(\t\022\017\n\007" + + "am_host\030\002 \001(\t\022\017\n\007am_port\030\003 \001(\005\022\030\n\020token_" + + "identifier\030\004 \001(\t\022\032\n\022credentials_binary\030\005" + + " \001(\014\022\014\n\004user\030\006 \001(\t\022\035\n\025application_id_str", + "ing\030\007 \001(\t\022\032\n\022app_attempt_number\030\010 \001(\005\022)\n" + + "\rfragment_spec\030\t \001(\0132\022.FragmentSpecProto" + + "\0223\n\025fragment_runtime_info\030\n \001(\0132\024.Fragme" + + "ntRuntimeInfo\"\031\n\027SubmitWorkResponseProto" + + "\"f\n\036SourceStateUpdatedRequestProto\022\020\n\010da" + + "g_name\030\001 \001(\t\022\020\n\010src_name\030\002 \001(\t\022 \n\005state\030" + + "\003 \001(\0162\021.SourceStateProto\"!\n\037SourceStateU" + + "pdatedResponseProto\"X\n\031QueryCompleteRequ" + + "estProto\022\020\n\010query_id\030\001 \001(\t\022\020\n\010dag_name\030\002" + + " \001(\t\022\027\n\014delete_delay\030\003 \001(\003:\0010\"\034\n\032QueryCo", + "mpleteResponseProto\"g\n\035TerminateFragment" + + "RequestProto\022\020\n\010query_id\030\001 \001(\t\022\020\n\010dag_na" + + "me\030\002 \001(\t\022\"\n\032fragment_identifier_string\030\007" + + " \001(\t\" \n\036TerminateFragmentResponseProto*2" + + "\n\020SourceStateProto\022\017\n\013S_SUCCEEDED\020\001\022\r\n\tS" + + "_RUNNING\020\0022\316\002\n\022LlapDaemonProtocol\022?\n\nsub" + + "mitWork\022\027.SubmitWorkRequestProto\032\030.Submi" + + "tWorkResponseProto\022W\n\022sourceStateUpdated" + + "\022\037.SourceStateUpdatedRequestProto\032 .Sour" + + "ceStateUpdatedResponseProto\022H\n\rqueryComp", + "lete\022\032.QueryCompleteRequestProto\032\033.Query" + + "CompleteResponseProto\022T\n\021terminateFragme" + + "nt\022\036.TerminateFragmentRequestProto\032\037.Ter" + + "minateFragmentResponseProtoBH\n&org.apach" + + "e.hadoop.hive.llap.daemon.rpcB\030LlapDaemo" + + "nProtocolProtos\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -12877,7 +12606,7 @@ public final class LlapDaemonProtocolProtos { internal_static_FragmentSpecProto_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_FragmentSpecProto_descriptor, - new java.lang.String[] { "TaskAttemptIdString", "DagName", "VertexName", "ProcessorDescriptor", "InputSpecs", "OutputSpecs", "GroupedInputSpecs", "VertexParallelism", "FragmentNumber", "AttemptNumber", }); + new java.lang.String[] { "FragmentIdentifierString", "DagName", "VertexName", "ProcessorDescriptor", "InputSpecs", "OutputSpecs", "GroupedInputSpecs", "VertexParallelism", "FragmentNumber", "AttemptNumber", }); internal_static_FragmentRuntimeInfo_descriptor = getDescriptor().getMessageTypes().get(5); internal_static_FragmentRuntimeInfo_fieldAccessorTable = new @@ -12925,7 +12654,7 @@ public final class LlapDaemonProtocolProtos { internal_static_TerminateFragmentRequestProto_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_TerminateFragmentRequestProto_descriptor, - new java.lang.String[] { "QueryId", "DagName", "DagAttemptNumber", "VertexName", "FragmentNumber", "AttemptNumber", }); + new java.lang.String[] { "QueryId", "DagName", "FragmentIdentifierString", }); internal_static_TerminateFragmentResponseProto_descriptor = getDescriptor().getMessageTypes().get(13); internal_static_TerminateFragmentResponseProto_fieldAccessorTable = new http://git-wip-us.apache.org/repos/asf/hive/blob/b8b94f29/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java index ea2d77a..1620ddf 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java @@ -101,6 +101,7 @@ public class AMReporter extends AbstractService { this.heartbeatInterval = conf.getLong(LlapConfiguration.LLAP_DAEMON_LIVENESS_HEARTBEAT_INTERVAL_MS, LlapConfiguration.LLAP_DAEMON_LIVENESS_HEARTBEAT_INTERVAL_MS_DEFAULT); + LOG.info("AMReporter running with NodeId: {}", nodeId); } @Override @@ -164,7 +165,7 @@ public class AMReporter extends AbstractService { synchronized (knownAppMasters) { amNodeInfo = knownAppMasters.get(amNodeId); if (amNodeInfo == null) { - LOG.error(("Ignoring unexpected unregisterRequest for am at: " + amLocation + ":" + port)); + LOG.info(("Ignoring duplocate unregisterRequest for am at: " + amLocation + ":" + port)); } amNodeInfo.decrementAndGetTaskCount(); // Not removing this here. Will be removed when taken off the queue and discovered to have 0 http://git-wip-us.apache.org/repos/asf/hive/blob/b8b94f29/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index d594d6a..2f2ccb0 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -45,7 +45,6 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper; -import org.apache.log4j.Logger; import org.apache.log4j.NDC; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.TokenCache; @@ -55,11 +54,16 @@ import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; // TODO Convert this to a CompositeService public class ContainerRunnerImpl extends AbstractService implements ContainerRunner, FragmentCompletionHandler { - private static final Logger LOG = Logger.getLogger(ContainerRunnerImpl.class); + // TODO Setup a set of threads to process incoming requests. + // Make sure requests for a single dag/query are handled by the same thread + + private static final Logger LOG = LoggerFactory.getLogger(ContainerRunnerImpl.class); public static final String THREAD_NAME_FORMAT_PREFIX = "ContainerExecutor "; private volatile AMReporter amReporter; @@ -143,12 +147,7 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun LOG.info("Queueing container for execution: " + stringifySubmitRequest(request)); // This is the start of container-annotated logging. // TODO Reduce the length of this string. Way too verbose at the moment. - String ndcContextString = - request.getContainerIdString() + "_" + - request.getFragmentSpec().getDagName() + "_" + - request.getFragmentSpec().getVertexName() + - "_" + request.getFragmentSpec().getFragmentNumber() + "_" + - request.getFragmentSpec().getAttemptNumber(); + String ndcContextString = request.getFragmentSpec().getFragmentIdentifierString(); NDC.push(ndcContextString); try { Map<String, String> env = new HashMap<>(); @@ -158,7 +157,7 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun FragmentSpecProto fragmentSpec = request.getFragmentSpec(); TezTaskAttemptID taskAttemptId = TezTaskAttemptID.fromString( - fragmentSpec.getTaskAttemptIdString()); + fragmentSpec.getFragmentIdentifierString()); int dagIdentifier = taskAttemptId.getTaskID().getVertexID().getDAGId().getId(); QueryFragmentInfo fragmentInfo = queryTracker @@ -222,7 +221,8 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun @Override public void terminateFragment(TerminateFragmentRequestProto request) { - // TODO Implement when this gets used. + LOG.info("DBG: Received terminateFragment request for {}", request.getFragmentIdentifierString()); + executorService.killFragment(request.getFragmentIdentifierString()); } private String stringifySourceStateUpdateRequest(SourceStateUpdatedRequestProto request) { @@ -235,15 +235,15 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun public static String stringifySubmitRequest(SubmitWorkRequestProto request) { StringBuilder sb = new StringBuilder(); + FragmentSpecProto fragmentSpec = request.getFragmentSpec(); sb.append("am_details=").append(request.getAmHost()).append(":").append(request.getAmPort()); + sb.append(", taskInfo=").append(fragmentSpec.getFragmentIdentifierString()); sb.append(", user=").append(request.getUser()); sb.append(", appIdString=").append(request.getApplicationIdString()); sb.append(", appAttemptNum=").append(request.getAppAttemptNumber()); sb.append(", containerIdString=").append(request.getContainerIdString()); - FragmentSpecProto fragmentSpec = request.getFragmentSpec(); sb.append(", dagName=").append(fragmentSpec.getDagName()); sb.append(", vertexName=").append(fragmentSpec.getVertexName()); - sb.append(", taskInfo=").append(fragmentSpec.getTaskAttemptIdString()); sb.append(", processor=").append(fragmentSpec.getProcessorDescriptor().getClassName()); sb.append(", numInputs=").append(fragmentSpec.getInputSpecsCount()); sb.append(", numOutputs=").append(fragmentSpec.getOutputSpecsCount()); http://git-wip-us.apache.org/repos/asf/hive/blob/b8b94f29/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java index c3102f9..eb06a2f 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/Scheduler.java @@ -30,4 +30,10 @@ public interface Scheduler<T> { * @throws RejectedExecutionException */ void schedule(T t) throws RejectedExecutionException; + + /** + * Attempt to kill the fragment with the specified fragmentId + * @param fragmentId + */ + void killFragment(String fragmentId); } http://git-wip-us.apache.org/repos/asf/hive/blob/b8b94f29/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index bfc4d89..453a71e 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -17,11 +17,10 @@ */ package org.apache.hadoop.hive.llap.daemon.impl; -import java.util.Collections; import java.util.Comparator; -import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.PriorityBlockingQueue; @@ -29,6 +28,7 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang3.exception.ExceptionUtils; @@ -74,6 +74,8 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> { private static final String TASK_EXECUTOR_THREAD_NAME_FORMAT = "Task-Executor-%d"; private static final String WAIT_QUEUE_SCHEDULER_THREAD_NAME_FORMAT = "Wait-Queue-Scheduler-%d"; + private final AtomicBoolean isShutdown = new AtomicBoolean(false); + // Thread pool for actual execution of work. private final ListeningExecutorService executorService; private final EvictingPriorityBlockingQueue<TaskWrapper> waitQueue; @@ -87,7 +89,7 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> { private final AtomicInteger numSlotsAvailable; // Tracks known tasks. - private final Set<TaskWrapper> knownTasks = Collections.newSetFromMap(new ConcurrentHashMap<TaskWrapper, Boolean>()); + private final ConcurrentMap<String, TaskWrapper> knownTasks = new ConcurrentHashMap<>(); private final Object lock = new Object(); @@ -131,27 +133,32 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> { @Override public void run() { + try { - synchronized (lock) { - while (waitQueue.isEmpty()) { - lock.wait(); - } - } - // Since schedule() can be called from multiple threads, we peek the wait queue, - // try scheduling the task and then remove the task if scheduling is successful. - // This will make sure the task's place in the wait queue is held until it gets scheduled. - while (true) { + while (!isShutdown.get()) { synchronized (lock) { + // Since schedule() can be called from multiple threads, we peek the wait queue, + // try scheduling the task and then remove the task if scheduling is successful. + // This will make sure the task's place in the wait queue is held until it gets scheduled. task = waitQueue.peek(); if (task == null) { - break; + if (!isShutdown.get()) { + lock.wait(); + } + continue; } - // if the task cannot finish and if no slots are available then don't schedule it. + // if the task cannot finish and if no slots are available then don't schedule it. boolean shouldWait = false; if (task.getTaskRunnerCallable().canFinish()) { + if (isDebugEnabled) { + LOG.debug( + "Attempting to schedule task {}, canFinish={}. Current state: preemptionQueueSize={}, numSlotsAvailable={}", + task.getRequestId(), task.getTaskRunnerCallable().canFinish(), + preemptionQueue.size(), numSlotsAvailable.get()); + } if (numSlotsAvailable.get() == 0 && preemptionQueue.isEmpty()) { shouldWait = true; } @@ -161,7 +168,9 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> { } } if (shouldWait) { - lock.wait(); + if (!isShutdown.get()) { + lock.wait(); + } // Another task at a higher priority may have come in during the wait. Lookup the // queue again to pick up the task at the highest priority. continue; @@ -179,15 +188,20 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> { synchronized (lock) { while (waitQueue.isEmpty()) { - lock.wait(); + if (!isShutdown.get()) { + lock.wait(); + } } } } } catch (InterruptedException e) { - // Executor service will create new thread if the current thread gets interrupted. We don't - // need to do anything with the exception. - LOG.info(WAIT_QUEUE_SCHEDULER_THREAD_NAME_FORMAT + " thread has been interrupted."); + if (isShutdown.get()) { + LOG.info(WAIT_QUEUE_SCHEDULER_THREAD_NAME_FORMAT + " thread has been interrupted after shutdown."); + } else { + LOG.warn(WAIT_QUEUE_SCHEDULER_THREAD_NAME_FORMAT + " interrupted without shutdown", e); + throw new RuntimeException(e); + } } } } @@ -207,24 +221,23 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> { @Override public void schedule(TaskRunnerCallable task) throws RejectedExecutionException { - TaskWrapper taskWrapper = new TaskWrapper(task); - knownTasks.add(taskWrapper); + TaskWrapper taskWrapper = new TaskWrapper(task, this); + knownTasks.put(taskWrapper.getRequestId(), taskWrapper); TaskWrapper evictedTask; try { // Don't need a lock. Not subscribed for notifications yet, and marked as inWaitQueue evictedTask = waitQueue.offer(taskWrapper); } catch (RejectedExecutionException e) { - knownTasks.remove(taskWrapper); + knownTasks.remove(taskWrapper.getRequestId()); throw e; } - if (evictedTask == null) { - if (isInfoEnabled) { - LOG.info(task.getRequestId() + " added to wait queue."); - } - if (isDebugEnabled) { - LOG.debug("Wait Queue: {}", waitQueue); - } - } else { + if (isInfoEnabled) { + LOG.info("{} added to wait queue. Current wait queue size={}", task.getRequestId(), waitQueue.size()); + } + if (isDebugEnabled) { + LOG.debug("Wait Queue: {}", waitQueue); + } + if (evictedTask != null) { evictedTask.maybeUnregisterForFinishedStateNotifications(); evictedTask.getTaskRunnerCallable().killTask(); if (isInfoEnabled) { @@ -237,13 +250,41 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> { } } + @Override + public void killFragment(String fragmentId) { + synchronized (lock) { + TaskWrapper taskWrapper = knownTasks.remove(fragmentId); + // Can be null since the task may have completed meanwhile. + if (taskWrapper != null) { + if (taskWrapper.inWaitQueue) { + if (isDebugEnabled) { + LOG.debug("Removing {} from waitQueue", fragmentId); + } + taskWrapper.setIsInWaitQueue(false); + waitQueue.remove(taskWrapper); + } + if (taskWrapper.inPreemptionQueue) { + if (isDebugEnabled) { + LOG.debug("Removing {} from preemptionQueue", fragmentId); + } + taskWrapper.setIsInPreemptableQueue(false); + preemptionQueue.remove(taskWrapper); + } + taskWrapper.getTaskRunnerCallable().killTask(); + } else { + LOG.info("Ignoring killFragment request for {} since it isn't known", fragmentId); + } + lock.notify(); + } + } + private boolean trySchedule(final TaskWrapper taskWrapper) { boolean scheduled = false; try { synchronized (lock) { boolean canFinish = taskWrapper.getTaskRunnerCallable().canFinish(); - boolean stateChanged = taskWrapper.maybeRegisterForFinishedStateNotifications(canFinish); + boolean stateChanged = !taskWrapper.maybeRegisterForFinishedStateNotifications(canFinish); ListenableFuture<TaskRunner2Result> future = executorService.submit(taskWrapper.getTaskRunnerCallable()); taskWrapper.setIsInWaitQueue(false); FutureCallback<TaskRunner2Result> wrappedCallback = new InternalCompletionListener(taskWrapper); @@ -357,7 +398,7 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> { @Override public void onSuccess(TaskRunner2Result result) { - knownTasks.remove(taskWrapper); + knownTasks.remove(taskWrapper.getRequestId()); taskWrapper.setIsInPreemptableQueue(false); taskWrapper.maybeUnregisterForFinishedStateNotifications(); taskWrapper.getTaskRunnerCallable().getCallback().onSuccess(result); @@ -366,7 +407,7 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> { @Override public void onFailure(Throwable t) { - knownTasks.remove(taskWrapper); + knownTasks.remove(taskWrapper.getRequestId()); taskWrapper.setIsInPreemptableQueue(false); taskWrapper.maybeUnregisterForFinishedStateNotifications(); taskWrapper.getTaskRunnerCallable().getCallback().onFailure(t); @@ -387,6 +428,9 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> { } numSlotsAvailable.incrementAndGet(); + LOG.info("Task {} complete. WaitQueueSize={}, numSlotsAvailable={}, preemptionQueueSize={}", + taskWrapper.getRequestId(), waitQueue.size(), numSlotsAvailable.get(), + preemptionQueue.size()); synchronized (lock) { if (!waitQueue.isEmpty()) { lock.notify(); @@ -398,21 +442,23 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> { // TODO: llap daemon should call this to gracefully shutdown the task executor service public void shutDown(boolean awaitTermination) { - if (awaitTermination) { - if (isDebugEnabled) { - LOG.debug("awaitTermination: " + awaitTermination + " shutting down task executor" + - " service gracefully"); - } - shutdownExecutor(waitQueueExecutorService); - shutdownExecutor(executorService); - shutdownExecutor(executionCompletionExecutorService); - } else { - if (isDebugEnabled) { - LOG.debug("awaitTermination: " + awaitTermination + " shutting down task executor" + - " service immediately"); + if (!isShutdown.getAndSet(true)) { + if (awaitTermination) { + if (isDebugEnabled) { + LOG.debug("awaitTermination: " + awaitTermination + " shutting down task executor" + + " service gracefully"); + } + shutdownExecutor(waitQueueExecutorService); + shutdownExecutor(executorService); + shutdownExecutor(executionCompletionExecutorService); + } else { + if (isDebugEnabled) { + LOG.debug("awaitTermination: " + awaitTermination + " shutting down task executor" + + " service immediately"); + } + executorService.shutdownNow(); + waitQueueExecutorService.shutdownNow(); } - executorService.shutdownNow(); - waitQueueExecutorService.shutdownNow(); } } @@ -474,14 +520,16 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> { } - private class TaskWrapper implements FinishableStateUpdateHandler { + public static class TaskWrapper implements FinishableStateUpdateHandler { private final TaskRunnerCallable taskRunnerCallable; private boolean inWaitQueue = true; private boolean inPreemptionQueue = false; private boolean registeredForNotifications = false; + private final TaskExecutorService taskExecutorService; - public TaskWrapper(TaskRunnerCallable taskRunnerCallable) { + public TaskWrapper(TaskRunnerCallable taskRunnerCallable, TaskExecutorService taskExecutorService) { this.taskRunnerCallable = taskRunnerCallable; + this.taskExecutorService = taskExecutorService; } // Methods are synchronized primarily for visibility. @@ -548,7 +596,7 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> { // Meanwhile the scheduler could try updating states via a synchronized method. LOG.info("DEBUG: Received finishable state update for {}, state={}", taskRunnerCallable.getRequestId(), finishableState); - TaskExecutorService.this.finishableStateUpdated(this, finishableState); + taskExecutorService.finishableStateUpdated(this, finishableState); } } } http://git-wip-us.apache.org/repos/asf/hive/blob/b8b94f29/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index 007c83d..1c12e12 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -98,6 +98,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { private boolean shouldRunTask = true; final Stopwatch runtimeWatch = new Stopwatch(); final Stopwatch killtimerWatch = new Stopwatch(); + private final AtomicBoolean isStarted = new AtomicBoolean(false); private final AtomicBoolean isCompleted = new AtomicBoolean(false); private final AtomicBoolean killInvoked = new AtomicBoolean(false); @@ -127,13 +128,15 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { request.getUser(), jobToken); } this.metrics = metrics; - this.requestId = getTaskAttemptId(request); + this.requestId = getRequestId(request); this.killedTaskHandler = killedTaskHandler; this.fragmentCompletionHanler = fragmentCompleteHandler; } @Override protected TaskRunner2Result callInternal() throws Exception { + isStarted.set(true); + this.startTime = System.currentTimeMillis(); this.threadName = Thread.currentThread().getName(); if (LOG.isDebugEnabled()) { @@ -143,12 +146,19 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { // Unregister from the AMReporter, since the task is now running. this.amReporter.unregisterTask(request.getAmHost(), request.getAmPort()); + synchronized (this) { + if (!shouldRunTask) { + LOG.info("Not starting task {} since it was killed earlier", taskSpec.getTaskAttemptID()); + return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, false); + } + } + // TODO This executor seems unnecessary. Here and TezChild ExecutorService executorReal = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() .setDaemon(true) .setNameFormat( - "TezTaskRunner_" + request.getFragmentSpec().getTaskAttemptIdString()) + "TezTaskRunner_" + request.getFragmentSpec().getFragmentIdentifierString()) .build()); executor = MoreExecutors.listeningDecorator(executorReal); @@ -244,6 +254,16 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { taskSpec.getTaskAttemptID()); } shouldRunTask = false; + } else { + // If the task hasn't started, and it is killed - report back to the AM that the task has been killed. + LOG.info("DBG: Reporting taskKilled for non-started fragment {}", getRequestId()); + reportTaskKilled(); + } + if (!isStarted.get()) { + // If the task hasn't started - inform about fragment completion immediately. It's possible for + // the callable to never run. + fragmentCompletionHanler.fragmentComplete(fragmentInfo); + this.amReporter.unregisterTask(request.getAmHost(), request.getAmPort()); } } } else { @@ -360,7 +380,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { metrics.incrExecutorTotalSuccess(); break; case CONTAINER_STOP_REQUESTED: - LOG.warn("Unexpected CONTAINER_STOP_REQUEST for {}", requestId); + LOG.info("Received container stop request (AM preemption) for {}", requestId); break; case KILL_REQUESTED: LOG.info("Killed task {}", requestId); @@ -439,8 +459,8 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { return sb.toString(); } - private String getTaskAttemptId(SubmitWorkRequestProto request) { - return request.getFragmentSpec().getTaskAttemptIdString(); + private static String getRequestId(SubmitWorkRequestProto request) { + return request.getFragmentSpec().getFragmentIdentifierString(); } public long getFirstAttemptStartTime() { http://git-wip-us.apache.org/repos/asf/hive/blob/b8b94f29/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java index 5bd1fe9..7428a6a 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/Converters.java @@ -43,7 +43,7 @@ public class Converters { public static TaskSpec getTaskSpecfromProto(FragmentSpecProto FragmentSpecProto) { TezTaskAttemptID taskAttemptID = - TezTaskAttemptID.fromString(FragmentSpecProto.getTaskAttemptIdString()); + TezTaskAttemptID.fromString(FragmentSpecProto.getFragmentIdentifierString()); ProcessorDescriptor processorDescriptor = null; if (FragmentSpecProto.hasProcessorDescriptor()) { @@ -83,7 +83,7 @@ public class Converters { public static FragmentSpecProto convertTaskSpecToProto(TaskSpec taskSpec) { FragmentSpecProto.Builder builder = FragmentSpecProto.newBuilder(); - builder.setTaskAttemptIdString(taskSpec.getTaskAttemptID().toString()); + builder.setFragmentIdentifierString(taskSpec.getTaskAttemptID().toString()); builder.setDagName(taskSpec.getDAGName()); builder.setVertexName(taskSpec.getVertexName()); builder.setVertexParallelism(taskSpec.getVertexParallelism()); http://git-wip-us.apache.org/repos/asf/hive/blob/b8b94f29/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index 6abd706..6a38d85 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -38,6 +38,8 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceSta import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto; import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol; import org.apache.hadoop.hive.llap.tezplugins.helpers.SourceStateTracker; import org.apache.hadoop.io.DataOutputBuffer; @@ -52,6 +54,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.tez.common.TezTaskUmbilicalProtocol; import org.apache.tez.common.security.JobTokenSecretManager; +import org.apache.tez.dag.api.ContainerEndReason; import org.apache.tez.dag.api.TaskAttemptEndReason; import org.apache.tez.dag.api.TaskCommunicatorContext; import org.apache.tez.dag.api.TezConfiguration; @@ -172,8 +175,15 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { } @Override - public void registerContainerEnd(ContainerId containerId) { - super.registerContainerEnd(containerId); + public void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason) { + super.registerContainerEnd(containerId, endReason); + if (endReason == ContainerEndReason.INTERNAL_PREEMPTION) { + LOG.info("Processing containerEnd for container {} caused by internal preemption", containerId); + TezTaskAttemptID taskAttemptId = entityTracker.getTaskAttemptIdForContainer(containerId); + if (taskAttemptId != null) { + sendTaskTerminated(taskAttemptId, true); + } + } entityTracker.unregisterContainer(containerId); } @@ -224,7 +234,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { // sending out status/DONE/KILLED/FAILED messages before TAImpl knows how to handle them. getTaskCommunicatorContext() .taskStartedRemotely(taskSpec.getTaskAttemptID(), containerId); - communicator.submitWork(requestProto, host, port, + communicator.sendSubmitWork(requestProto, host, port, new TaskCommunicator.ExecuteRequestCallback<SubmitWorkResponseProto>() { @Override public void setResponse(SubmitWorkResponseProto response) { @@ -238,7 +248,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { t = se.getCause(); } if (t instanceof RemoteException) { - RemoteException re = (RemoteException)t; + RemoteException re = (RemoteException) t; String message = re.toString(); // RejectedExecutions from the remote service treated as KILLED if (message.contains(RejectedExecutionException.class.getName())) { @@ -249,7 +259,9 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { TaskAttemptEndReason.SERVICE_BUSY, "Service Busy"); } else { // All others from the remote service cause the task to FAIL. - LOG.info("Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " + containerId, t); + LOG.info( + "Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " + + containerId, t); getTaskCommunicatorContext() .taskFailed(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.OTHER, t.toString()); @@ -264,7 +276,9 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { TaskAttemptEndReason.COMMUNICATION_ERROR, "Communication Error"); } else { // Anything else is a FAIL. - LOG.info("Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " + containerId, t); + LOG.info( + "Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " + + containerId, t); getTaskCommunicatorContext() .taskFailed(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.OTHER, t.getMessage()); @@ -275,14 +289,50 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { } @Override - public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID) { - super.unregisterRunningTaskAttempt(taskAttemptID); - entityTracker.unregisterTaskAttempt(taskAttemptID); + public void unregisterRunningTaskAttempt(final TezTaskAttemptID taskAttemptId, + TaskAttemptEndReason endReason) { + super.unregisterRunningTaskAttempt(taskAttemptId, endReason); + + if (endReason == TaskAttemptEndReason.INTERNAL_PREEMPTION) { + LOG.info("Processing taskEnd for task {} caused by internal preemption", taskAttemptId); + sendTaskTerminated(taskAttemptId, false); + } + entityTracker.unregisterTaskAttempt(taskAttemptId); // This will also be invoked for tasks which have been KILLED / rejected by the daemon. // Informing the daemon becomes necessary once the LlapScheduler supports preemption // and/or starts attempting to kill tasks which may be running on a node. } + private void sendTaskTerminated(final TezTaskAttemptID taskAttemptId, + boolean invokedByContainerEnd) { + LOG.info( + "DBG: Attempting to send terminateRequest for fragment {} due to internal preemption invoked by {}", + taskAttemptId.toString(), invokedByContainerEnd ? "containerEnd" : "taskEnd"); + LlapNodeId nodeId = entityTracker.getNodeIfForTaskAttempt(taskAttemptId); + // NodeId can be null if the task gets unregistered due to failure / being killed by the daemon itself + if (nodeId != null) { + TerminateFragmentRequestProto request = + TerminateFragmentRequestProto.newBuilder().setDagName(currentDagName) + .setFragmentIdentifierString(taskAttemptId.toString()).build(); + communicator.sendTerminateFragment(request, nodeId.getHostname(), nodeId.getPort(), + new TaskCommunicator.ExecuteRequestCallback<TerminateFragmentResponseProto>() { + @Override + public void setResponse(TerminateFragmentResponseProto response) { + } + + @Override + public void indicateError(Throwable t) { + LOG.warn("Failed to send terminate fragment request for {}", + taskAttemptId.toString()); + } + }); + } else { + LOG.info( + "Not sending terminate request for fragment {} since it's node is not known. Already unregistered", + taskAttemptId.toString()); + } + } + @Override public void dagComplete(final String dagName) { QueryCompleteRequestProto request = QueryCompleteRequestProto.newBuilder().setDagName( @@ -410,7 +460,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { public void taskKilled(TezTaskAttemptID taskAttemptId) { // TODO Unregister the task for state updates, which could in turn unregister the node. getTaskCommunicatorContext().taskKilled(taskAttemptId, - TaskAttemptEndReason.INTERRUPTED_BY_SYSTEM, "Attempt preempted"); + TaskAttemptEndReason.EXTERNAL_PREEMPTION, "Attempt preempted"); entityTracker.unregisterTaskAttempt(taskAttemptId); } @@ -480,6 +530,42 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { containerToNodeMap.putIfAbsent(containerId, LlapNodeId.getInstance(hostname, port)); } + LlapNodeId getNodeIdForContainer(ContainerId containerId) { + return containerToNodeMap.get(containerId); + } + + LlapNodeId getNodeIfForTaskAttempt(TezTaskAttemptID taskAttemptId) { + return attemptToNodeMap.get(taskAttemptId); + } + + ContainerId getContainerIdForAttempt(TezTaskAttemptID taskAttemptId) { + LlapNodeId llapNodeId = getNodeIfForTaskAttempt(taskAttemptId); + if (llapNodeId != null) { + BiMap<TezTaskAttemptID, ContainerId> bMap = nodeMap.get(llapNodeId).inverse(); + if (bMap != null) { + return bMap.get(taskAttemptId); + } else { + return null; + } + } else { + return null; + } + } + + TezTaskAttemptID getTaskAttemptIdForContainer(ContainerId containerId) { + LlapNodeId llapNodeId = getNodeIdForContainer(containerId); + if (llapNodeId != null) { + BiMap<ContainerId, TezTaskAttemptID> bMap = nodeMap.get(llapNodeId); + if (bMap != null) { + return bMap.get(containerId); + } else { + return null; + } + } else { + return null; + } + } + void unregisterContainer(ContainerId containerId) { LlapNodeId llapNodeId = containerToNodeMap.remove(containerId); if (llapNodeId == null) { http://git-wip-us.apache.org/repos/asf/hive/blob/b8b94f29/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java index cec17f9..d357d61 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java @@ -41,6 +41,8 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceSta import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.net.NetUtils; @@ -84,21 +86,10 @@ public class TaskCommunicator extends AbstractService { executor.shutdownNow(); } - public void submitWork(SubmitWorkRequestProto request, String host, int port, + public void sendSubmitWork(SubmitWorkRequestProto request, String host, int port, final ExecuteRequestCallback<SubmitWorkResponseProto> callback) { ListenableFuture<SubmitWorkResponseProto> future = executor.submit(new SubmitWorkCallable(host, port, request)); - Futures.addCallback(future, new FutureCallback<SubmitWorkResponseProto>() { - @Override - public void onSuccess(SubmitWorkResponseProto result) { - callback.setResponse(result); - } - - @Override - public void onFailure(Throwable t) { - callback.indicateError(t); - } - }); - + Futures.addCallback(future, new ResponseCallback(callback)); } public void sendSourceStateUpdate(final SourceStateUpdatedRequestProto request, final String host, @@ -106,17 +97,7 @@ public class TaskCommunicator extends AbstractService { final ExecuteRequestCallback<SourceStateUpdatedResponseProto> callback) { ListenableFuture<SourceStateUpdatedResponseProto> future = executor.submit(new SendSourceStateUpdateCallable(host, port, request)); - Futures.addCallback(future, new FutureCallback<SourceStateUpdatedResponseProto>() { - @Override - public void onSuccess(SourceStateUpdatedResponseProto result) { - callback.setResponse(result); - } - - @Override - public void onFailure(Throwable t) { - callback.indicateError(t); - } - }); + Futures.addCallback(future, new ResponseCallback(callback)); } public void sendQueryComplete(final QueryCompleteRequestProto request, final String host, @@ -124,17 +105,34 @@ public class TaskCommunicator extends AbstractService { final ExecuteRequestCallback<QueryCompleteResponseProto> callback) { ListenableFuture<QueryCompleteResponseProto> future = executor.submit(new SendQueryCompleteCallable(host, port, request)); - Futures.addCallback(future, new FutureCallback<QueryCompleteResponseProto>() { - @Override - public void onSuccess(QueryCompleteResponseProto result) { - callback.setResponse(result); - } + Futures.addCallback(future, new ResponseCallback(callback)); + } - @Override - public void onFailure(Throwable t) { - callback.indicateError(t); - } - }); + public void sendTerminateFragment(final TerminateFragmentRequestProto request, final String host, + final int port, + final ExecuteRequestCallback<TerminateFragmentResponseProto> callback) { + ListenableFuture<TerminateFragmentResponseProto> future = + executor.submit(new SendTerminateFragmentCallable(host, port, request)); + Futures.addCallback(future, new ResponseCallback(callback)); + } + + private static class ResponseCallback<TYPE extends Message> implements FutureCallback<TYPE> { + + private final ExecuteRequestCallback<TYPE> callback; + + public ResponseCallback(ExecuteRequestCallback<TYPE> callback) { + this.callback = callback; + } + + @Override + public void onSuccess(TYPE result) { + callback.setResponse(result); + } + + @Override + public void onFailure(Throwable t) { + callback.indicateError(t); + } } private static abstract class CallableRequest<REQUEST extends Message, RESPONSE extends Message> @@ -195,6 +193,20 @@ public class TaskCommunicator extends AbstractService { } } + private class SendTerminateFragmentCallable + extends CallableRequest<TerminateFragmentRequestProto, TerminateFragmentResponseProto> { + + protected SendTerminateFragmentCallable(String hostname, int port, + TerminateFragmentRequestProto terminateFragmentRequestProto) { + super(hostname, port, terminateFragmentRequestProto); + } + + @Override + public TerminateFragmentResponseProto call() throws Exception { + return getProxy(hostname, port).terminateFragment(null, request); + } + } + public interface ExecuteRequestCallback<T extends Message> { void setResponse(T response); void indicateError(Throwable t);