TAJO-324: Rename the prefix 'QueryUnit' to Task. Closes #306
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/5c852b79 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/5c852b79 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/5c852b79 Branch: refs/heads/master Commit: 5c852b7984ba96cd60d528dd2086e2745cc8e3f5 Parents: ff57c77 Author: Hyunsik Choi <[email protected]> Authored: Thu Dec 18 12:55:25 2014 +0900 Committer: Hyunsik Choi <[email protected]> Committed: Thu Dec 18 12:55:25 2014 +0900 ---------------------------------------------------------------------- CHANGES | 5 +- .../java/org/apache/tajo/QueryIdFactory.java | 12 +- .../org/apache/tajo/QueryUnitAttemptId.java | 94 -- .../main/java/org/apache/tajo/QueryUnitId.java | 90 -- .../java/org/apache/tajo/TaskAttemptId.java | 94 ++ .../src/main/java/org/apache/tajo/TaskId.java | 90 ++ .../java/org/apache/tajo/util/TajoIdUtils.java | 8 +- tajo-common/src/main/proto/TajoIdProtos.proto | 6 +- .../physical/HashShuffleFileWriteExec.java | 3 +- .../engine/planner/physical/PhysicalExec.java | 2 +- .../engine/planner/physical/SeqScanExec.java | 2 +- .../tajo/engine/query/QueryUnitRequest.java | 51 -- .../tajo/engine/query/QueryUnitRequestImpl.java | 328 ------- .../apache/tajo/engine/query/TaskRequest.java | 51 ++ .../tajo/engine/query/TaskRequestImpl.java | 328 +++++++ .../tajo/master/DefaultTaskScheduler.java | 162 ++-- .../apache/tajo/master/LazyTaskScheduler.java | 46 +- .../master/NonForwardQueryResultScanner.java | 6 +- .../apache/tajo/master/TajoContainerProxy.java | 4 +- .../event/ContainerAllocatorEventType.java | 2 +- .../tajo/master/event/LocalTaskEvent.java | 8 +- .../event/QueryUnitAttemptScheduleEvent.java | 87 -- .../tajo/master/event/SubQueryEventType.java | 2 +- .../tajo/master/event/SubQueryTaskEvent.java | 8 +- .../master/event/TaskAttemptAssignedEvent.java | 4 +- .../tajo/master/event/TaskAttemptEvent.java | 8 +- .../master/event/TaskAttemptScheduleEvent.java | 4 +- .../event/TaskAttemptStatusUpdateEvent.java | 4 +- .../event/TaskAttemptToSchedulerEvent.java | 87 ++ .../tajo/master/event/TaskCompletionEvent.java | 4 +- .../org/apache/tajo/master/event/TaskEvent.java | 8 +- .../tajo/master/event/TaskFatalErrorEvent.java | 6 +- .../tajo/master/event/TaskRequestEvent.java | 9 +- .../tajo/master/event/TaskTAttemptEvent.java | 10 +- .../querymaster/QueryMasterManagerService.java | 22 +- .../master/querymaster/QueryMasterTask.java | 14 +- .../tajo/master/querymaster/QueryUnit.java | 907 ------------------- .../master/querymaster/QueryUnitAttempt.java | 443 --------- .../tajo/master/querymaster/Repartitioner.java | 14 +- .../tajo/master/querymaster/SubQuery.java | 72 +- .../apache/tajo/master/querymaster/Task.java | 907 +++++++++++++++++++ .../tajo/master/querymaster/TaskAttempt.java | 443 +++++++++ .../main/java/org/apache/tajo/util/JSPUtil.java | 84 +- .../apache/tajo/util/history/HistoryReader.java | 22 +- .../apache/tajo/util/history/HistoryWriter.java | 4 +- .../tajo/util/history/QueryUnitHistory.java | 167 ---- .../tajo/util/history/SubQueryHistory.java | 22 +- .../apache/tajo/util/history/TaskHistory.java | 167 ++++ .../tajo/worker/ExecutionBlockContext.java | 12 +- .../java/org/apache/tajo/worker/FetchImpl.java | 19 +- .../apache/tajo/worker/InterDataRetriever.java | 8 +- .../tajo/worker/TajoWorkerManagerService.java | 8 +- .../main/java/org/apache/tajo/worker/Task.java | 31 +- .../apache/tajo/worker/TaskAttemptContext.java | 12 +- .../org/apache/tajo/worker/TaskHistory.java | 18 +- .../java/org/apache/tajo/worker/TaskRunner.java | 18 +- .../apache/tajo/worker/TaskRunnerHistory.java | 16 +- .../apache/tajo/worker/TaskRunnerManager.java | 10 +- .../retriever/AdvancedDataRetriever.java | 14 +- .../src/main/proto/QueryMasterProtocol.proto | 2 +- .../src/main/proto/TajoWorkerProtocol.proto | 22 +- .../main/resources/webapps/admin/querytasks.jsp | 70 +- .../main/resources/webapps/admin/queryunit.jsp | 134 --- .../src/main/resources/webapps/admin/task.jsp | 134 +++ .../resources/webapps/worker/querytasks.jsp | 70 +- .../main/resources/webapps/worker/queryunit.jsp | 175 ---- .../src/main/resources/webapps/worker/task.jsp | 174 ++++ .../resources/webapps/worker/taskdetail.jsp | 14 +- .../resources/webapps/worker/taskhistory.jsp | 6 +- .../src/main/resources/webapps/worker/tasks.jsp | 14 +- .../apache/tajo/LocalTajoTestingUtility.java | 10 +- .../org/apache/tajo/TestQueryIdFactory.java | 6 +- .../test/java/org/apache/tajo/TestTajoIds.java | 32 +- .../org/apache/tajo/client/TestTajoClient.java | 12 +- .../planner/physical/TestBNLJoinExec.java | 4 +- .../planner/physical/TestBSTIndexExec.java | 2 +- .../planner/physical/TestExternalSortExec.java | 2 +- .../physical/TestFullOuterHashJoinExec.java | 8 +- .../physical/TestFullOuterMergeJoinExec.java | 12 +- .../planner/physical/TestHashAntiJoinExec.java | 2 +- .../planner/physical/TestHashJoinExec.java | 4 +- .../planner/physical/TestHashSemiJoinExec.java | 2 +- .../physical/TestLeftOuterHashJoinExec.java | 10 +- .../physical/TestLeftOuterNLJoinExec.java | 10 +- .../planner/physical/TestMergeJoinExec.java | 2 +- .../engine/planner/physical/TestNLJoinExec.java | 4 +- .../planner/physical/TestPhysicalPlanner.java | 60 +- .../physical/TestProgressExternalSortExec.java | 2 +- .../physical/TestRightOuterHashJoinExec.java | 6 +- .../physical/TestRightOuterMergeJoinExec.java | 12 +- .../engine/planner/physical/TestSortExec.java | 2 +- .../tajo/engine/query/TestGroupByQuery.java | 4 +- .../apache/tajo/master/TestRepartitioner.java | 21 +- .../querymaster/TestIntermediateEntry.java | 2 +- .../querymaster/TestQueryUnitStatusUpdate.java | 194 ---- .../querymaster/TestTaskStatusUpdate.java | 194 ++++ .../java/org/apache/tajo/util/TestJSPUtil.java | 54 +- .../util/history/TestHistoryWriterReader.java | 39 +- .../org/apache/tajo/worker/TestHistory.java | 8 +- .../tajo/worker/TestRangeRetrieverHandler.java | 4 +- .../worker/dataserver/TestHttpDataServer.java | 12 +- .../queries/TestQueryUnitStatusUpdate/case1.sql | 1 - .../queries/TestQueryUnitStatusUpdate/case2.sql | 5 - .../queries/TestQueryUnitStatusUpdate/case3.sql | 10 - .../queries/TestTaskStatusUpdate/case1.sql | 1 + .../queries/TestTaskStatusUpdate/case2.sql | 5 + .../queries/TestTaskStatusUpdate/case3.sql | 10 + tajo-dist/pom.xml | 4 +- .../tajo/pullserver/HttpDataServerHandler.java | 2 +- .../retriever/AdvancedDataRetriever.java | 16 +- .../org/apache/tajo/storage/StorageManager.java | 6 +- .../org/apache/tajo/storage/StorageUtil.java | 4 +- .../storage/hbase/AbstractHBaseAppender.java | 6 +- .../tajo/storage/hbase/HBasePutAppender.java | 4 +- .../tajo/storage/hbase/HBaseStorageManager.java | 2 +- .../tajo/storage/hbase/HFileAppender.java | 8 +- .../java/org/apache/tajo/storage/CSVFile.java | 4 +- .../org/apache/tajo/storage/FileAppender.java | 6 +- .../apache/tajo/storage/FileStorageManager.java | 8 +- .../tajo/storage/HashShuffleAppender.java | 12 +- .../storage/HashShuffleAppenderManager.java | 6 +- .../java/org/apache/tajo/storage/RawFile.java | 4 +- .../java/org/apache/tajo/storage/RowFile.java | 4 +- .../apache/tajo/storage/avro/AvroAppender.java | 4 +- .../tajo/storage/parquet/ParquetAppender.java | 4 +- .../org/apache/tajo/storage/rcfile/RCFile.java | 4 +- .../sequencefile/SequenceFileAppender.java | 4 +- .../tajo/storage/text/DelimitedTextFile.java | 5 +- 128 files changed, 3390 insertions(+), 3397 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 6b9146a..2230d0f 100644 --- a/CHANGES +++ b/CHANGES @@ -24,7 +24,8 @@ Release 0.9.1 - unreleased IMPROVEMENT - TAJO-1221: HA TajoClient should not connect TajoMaster at the first. (jaehwa) + TAJO-1221: HA TajoClient should not connect TajoMaster at the first. + (jaehwa) TAJO-1241: Change default client and table time zone behavior. (hyunsik) @@ -224,6 +225,8 @@ Release 0.9.1 - unreleased SUB TASKS + TAJO-324: Rename the prefix 'QueryUnit' to Task. (hyunsik) + TAJO-1151: Implement the ByteBuffer-based De/Serializer. (jinho) TAJO-1152: RawFile ByteBuffer should be reuse. (jinho) http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-common/src/main/java/org/apache/tajo/QueryIdFactory.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/QueryIdFactory.java b/tajo-common/src/main/java/org/apache/tajo/QueryIdFactory.java index 8ac0d54..9599007 100644 --- a/tajo-common/src/main/java/org/apache/tajo/QueryIdFactory.java +++ b/tajo-common/src/main/java/org/apache/tajo/QueryIdFactory.java @@ -92,15 +92,15 @@ public class QueryIdFactory { return new ExecutionBlockId(queryId, id); } - public synchronized static QueryUnitId newQueryUnitId(ExecutionBlockId executionBlockId) { - return new QueryUnitId(executionBlockId, nextId.incrementAndGet()); + public synchronized static TaskId newTaskId(ExecutionBlockId executionBlockId) { + return new TaskId(executionBlockId, nextId.incrementAndGet()); } - public synchronized static QueryUnitId newQueryUnitId(ExecutionBlockId executionBlockId, int id) { - return new QueryUnitId(executionBlockId, id); + public synchronized static TaskId newTaskId(ExecutionBlockId executionBlockId, int id) { + return new TaskId(executionBlockId, id); } - public synchronized static QueryUnitAttemptId newQueryUnitAttemptId(QueryUnitId queryUnitId, final int attemptId) { - return new QueryUnitAttemptId(queryUnitId, attemptId); + public synchronized static TaskAttemptId newTaskAttemptId(TaskId taskId, final int attemptId) { + return new TaskAttemptId(taskId, attemptId); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-common/src/main/java/org/apache/tajo/QueryUnitAttemptId.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/QueryUnitAttemptId.java b/tajo-common/src/main/java/org/apache/tajo/QueryUnitAttemptId.java deleted file mode 100644 index a9fd68b..0000000 --- a/tajo-common/src/main/java/org/apache/tajo/QueryUnitAttemptId.java +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo; - -import com.google.common.base.Objects; - -public class QueryUnitAttemptId implements Comparable<QueryUnitAttemptId> { - public static final String QUA_ID_PREFIX = "ta"; - - private QueryUnitId queryUnitId; - private int id; - - public QueryUnitId getQueryUnitId() { - return queryUnitId; - } - - public int getId() { - return id; - } - - public void setId(int id) { - this.id = id; - } - - public QueryUnitAttemptId(QueryUnitId queryUnitId, int id) { - this.queryUnitId = queryUnitId; - this.id = id; - } - - public QueryUnitAttemptId(TajoIdProtos.QueryUnitAttemptIdProto proto) { - this(new QueryUnitId(proto.getQueryUnitId()), proto.getId()); - } - - public TajoIdProtos.QueryUnitAttemptIdProto getProto() { - return TajoIdProtos.QueryUnitAttemptIdProto.newBuilder() - .setQueryUnitId(queryUnitId.getProto()) - .setId(id) - .build(); - } - - @Override - public int compareTo(QueryUnitAttemptId queryUnitAttemptId) { - int result = queryUnitId.compareTo(queryUnitAttemptId.queryUnitId); - if (result == 0) { - return id - queryUnitAttemptId.id; - } else { - return result; - } - } - - @Override - public boolean equals(Object obj) { - if (obj == null) { - return false; - } - if (this == obj) { - return true; - } - if(!(obj instanceof QueryUnitAttemptId)) { - return false; - } - return compareTo((QueryUnitAttemptId)obj) == 0; - } - - @Override - public int hashCode() { - return Objects.hashCode(queryUnitId, id); - } - - @Override - public String toString() { - return QUA_ID_PREFIX + QueryId.SEPARATOR + toStringNoPrefix(); - } - - public String toStringNoPrefix() { - return queryUnitId.toStringNoPrefix() + QueryId.SEPARATOR + QueryIdFactory.ATTEMPT_ID_FORMAT.format(id); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-common/src/main/java/org/apache/tajo/QueryUnitId.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/QueryUnitId.java b/tajo-common/src/main/java/org/apache/tajo/QueryUnitId.java deleted file mode 100644 index da0479b..0000000 --- a/tajo-common/src/main/java/org/apache/tajo/QueryUnitId.java +++ /dev/null @@ -1,90 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo; - -import com.google.common.base.Objects; - -public class QueryUnitId implements Comparable<QueryUnitId> { - public static final String QU_ID_PREFIX = "t"; - - private ExecutionBlockId executionBlockId; - private int id; - - public QueryUnitId(ExecutionBlockId executionBlockId, int id) { - this.executionBlockId = executionBlockId; - this.id = id; - } - - public QueryUnitId(TajoIdProtos.QueryUnitIdProto proto) { - this(new ExecutionBlockId(proto.getExecutionBlockId()), proto.getId()); - } - - public ExecutionBlockId getExecutionBlockId() { - return executionBlockId; - } - - public int getId() { - return id; - } - - public TajoIdProtos.QueryUnitIdProto getProto() { - return TajoIdProtos.QueryUnitIdProto.newBuilder() - .setExecutionBlockId(executionBlockId.getProto()) - .setId(id) - .build(); - } - - @Override - public int compareTo(QueryUnitId queryUnitId) { - int result = executionBlockId.compareTo(queryUnitId.executionBlockId); - if (result == 0) { - return id - queryUnitId.id; - } else { - return result; - } - } - - @Override - public boolean equals(Object obj) { - if (obj == null) { - return false; - } - if (this == obj) { - return true; - } - if (!(obj instanceof QueryUnitId)) { - return false; - } - return compareTo((QueryUnitId) obj) == 0; - } - - @Override - public int hashCode() { - return Objects.hashCode(executionBlockId, id); - } - - @Override - public String toString() { - return QU_ID_PREFIX + QueryId.SEPARATOR + toStringNoPrefix(); - } - - public String toStringNoPrefix() { - return executionBlockId.toStringNoPrefix() + QueryId.SEPARATOR + QueryIdFactory.QU_ID_FORMAT.format(id); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-common/src/main/java/org/apache/tajo/TaskAttemptId.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/TaskAttemptId.java b/tajo-common/src/main/java/org/apache/tajo/TaskAttemptId.java new file mode 100644 index 0000000..78c6325 --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/TaskAttemptId.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo; + +import com.google.common.base.Objects; + +public class TaskAttemptId implements Comparable<TaskAttemptId> { + public static final String QUA_ID_PREFIX = "ta"; + + private TaskId taskId; + private int id; + + public TaskId getTaskId() { + return taskId; + } + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public TaskAttemptId(TaskId taskId, int id) { + this.taskId = taskId; + this.id = id; + } + + public TaskAttemptId(TajoIdProtos.TaskAttemptIdProto proto) { + this(new TaskId(proto.getTaskId()), proto.getId()); + } + + public TajoIdProtos.TaskAttemptIdProto getProto() { + return TajoIdProtos.TaskAttemptIdProto.newBuilder() + .setTaskId(taskId.getProto()) + .setId(id) + .build(); + } + + @Override + public int compareTo(TaskAttemptId taskAttemptId) { + int result = taskId.compareTo(taskAttemptId.taskId); + if (result == 0) { + return id - taskAttemptId.id; + } else { + return result; + } + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (this == obj) { + return true; + } + if(!(obj instanceof TaskAttemptId)) { + return false; + } + return compareTo((TaskAttemptId)obj) == 0; + } + + @Override + public int hashCode() { + return Objects.hashCode(taskId, id); + } + + @Override + public String toString() { + return QUA_ID_PREFIX + QueryId.SEPARATOR + toStringNoPrefix(); + } + + public String toStringNoPrefix() { + return taskId.toStringNoPrefix() + QueryId.SEPARATOR + QueryIdFactory.ATTEMPT_ID_FORMAT.format(id); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-common/src/main/java/org/apache/tajo/TaskId.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/TaskId.java b/tajo-common/src/main/java/org/apache/tajo/TaskId.java new file mode 100644 index 0000000..e1db67d --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/TaskId.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo; + +import com.google.common.base.Objects; + +public class TaskId implements Comparable<TaskId> { + public static final String QU_ID_PREFIX = "t"; + + private ExecutionBlockId executionBlockId; + private int id; + + public TaskId(ExecutionBlockId executionBlockId, int id) { + this.executionBlockId = executionBlockId; + this.id = id; + } + + public TaskId(TajoIdProtos.TaskIdProto proto) { + this(new ExecutionBlockId(proto.getExecutionBlockId()), proto.getId()); + } + + public ExecutionBlockId getExecutionBlockId() { + return executionBlockId; + } + + public int getId() { + return id; + } + + public TajoIdProtos.TaskIdProto getProto() { + return TajoIdProtos.TaskIdProto.newBuilder() + .setExecutionBlockId(executionBlockId.getProto()) + .setId(id) + .build(); + } + + @Override + public int compareTo(TaskId taskId) { + int result = executionBlockId.compareTo(taskId.executionBlockId); + if (result == 0) { + return id - taskId.id; + } else { + return result; + } + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (this == obj) { + return true; + } + if (!(obj instanceof TaskId)) { + return false; + } + return compareTo((TaskId) obj) == 0; + } + + @Override + public int hashCode() { + return Objects.hashCode(executionBlockId, id); + } + + @Override + public String toString() { + return QU_ID_PREFIX + QueryId.SEPARATOR + toStringNoPrefix(); + } + + public String toStringNoPrefix() { + return executionBlockId.toStringNoPrefix() + QueryId.SEPARATOR + QueryIdFactory.QU_ID_FORMAT.format(id); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-common/src/main/java/org/apache/tajo/util/TajoIdUtils.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/util/TajoIdUtils.java b/tajo-common/src/main/java/org/apache/tajo/util/TajoIdUtils.java index 978af6f..cc0f854 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/TajoIdUtils.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/TajoIdUtils.java @@ -20,8 +20,8 @@ package org.apache.tajo.util; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.QueryId; -import org.apache.tajo.QueryUnitAttemptId; -import org.apache.tajo.QueryUnitId; +import org.apache.tajo.TaskAttemptId; +import org.apache.tajo.TaskId; import java.text.DecimalFormat; @@ -34,10 +34,10 @@ public class TajoIdUtils { return new ExecutionBlockId(new QueryId(tokens[1], Integer.parseInt(tokens[2])), Integer.parseInt(tokens[3])); } - public static QueryUnitAttemptId parseQueryUnitAttemptId(String idStr) { + public static TaskAttemptId parseTaskAttemptId(String idStr) { String[] tokens = idStr.split("_"); - return new QueryUnitAttemptId(new QueryUnitId( + return new TaskAttemptId(new TaskId( new ExecutionBlockId(new QueryId(tokens[1], Integer.parseInt(tokens[2])), Integer.parseInt(tokens[3])), Integer.parseInt(tokens[4])), Integer.parseInt(tokens[5])); } http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-common/src/main/proto/TajoIdProtos.proto ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/proto/TajoIdProtos.proto b/tajo-common/src/main/proto/TajoIdProtos.proto index 1fb8bbd..c5e80a6 100644 --- a/tajo-common/src/main/proto/TajoIdProtos.proto +++ b/tajo-common/src/main/proto/TajoIdProtos.proto @@ -31,13 +31,13 @@ message ExecutionBlockIdProto { required int32 id = 2; } -message QueryUnitIdProto { +message TaskIdProto { required ExecutionBlockIdProto executionBlockId = 1; required int32 id = 2; } -message QueryUnitAttemptIdProto { - required QueryUnitIdProto queryUnitId = 1; +message TaskAttemptIdProto { + required TaskIdProto taskId = 1; required int32 id = 2; } http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java index d051fb6..3c4949f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java @@ -27,7 +27,6 @@ import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.plan.logical.ShuffleFileWriteNode; -import org.apache.tajo.storage.StorageManager; import org.apache.tajo.storage.HashShuffleAppender; import org.apache.tajo.storage.HashShuffleAppenderManager; import org.apache.tajo.storage.Tuple; @@ -87,7 +86,7 @@ public final class HashShuffleFileWriteExec extends UnaryPhysicalExec { HashShuffleAppender appender = appenderMap.get(partId); if (appender == null) { appender = hashShuffleAppenderManager.getAppender(context.getConf(), - context.getQueryId().getQueryUnitId().getExecutionBlockId(), partId, meta, outSchema); + context.getQueryId().getTaskId().getExecutionBlockId(), partId, meta, outSchema); appenderMap.put(partId, appender); } return appender; http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java index 67fb29b..de14c9a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java @@ -79,7 +79,7 @@ public abstract class PhysicalExec implements SchemaObject { } protected Path getExecutorTmpDir() { - return new Path(context.getQueryId().getQueryUnitId().getExecutionBlockId().getQueryId().toString(), + return new Path(context.getQueryId().getTaskId().getExecutionBlockId().getQueryId().toString(), UUID.randomUUID().toString()); } http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java index f507988..94cd4ed 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java @@ -88,7 +88,7 @@ public class SeqScanExec extends PhysicalExec { } cacheKey = new TupleCacheKey( - context.getTaskId().getQueryUnitId().getExecutionBlockId().toString(), plan.getTableName(), pathNameKey); + context.getTaskId().getTaskId().getExecutionBlockId().toString(), plan.getTableName(), pathNameKey); } if (fragments != null http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequest.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequest.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequest.java deleted file mode 100644 index 3b0d60d..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequest.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * - */ -package org.apache.tajo.engine.query; - -import org.apache.tajo.QueryUnitAttemptId; -import org.apache.tajo.catalog.proto.CatalogProtos; -import org.apache.tajo.common.ProtoObject; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.engine.planner.enforce.Enforcer; -import org.apache.tajo.engine.planner.global.DataChannel; -import org.apache.tajo.ipc.TajoWorkerProtocol; -import org.apache.tajo.worker.FetchImpl; - -import java.util.List; - -public interface QueryUnitRequest extends ProtoObject<TajoWorkerProtocol.QueryUnitRequestProto> { - - public QueryUnitAttemptId getId(); - public List<CatalogProtos.FragmentProto> getFragments(); - public String getOutputTableId(); - public boolean isClusteredOutput(); - public String getSerializedData(); - public boolean isInterQuery(); - public void setInterQuery(); - public void addFetch(String name, FetchImpl fetch); - public List<FetchImpl> getFetches(); - public boolean shouldDie(); - public void setShouldDie(); - public QueryContext getQueryContext(TajoConf conf); - public DataChannel getDataChannel(); - public Enforcer getEnforcer(); -} http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java deleted file mode 100644 index 1b89afd..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java +++ /dev/null @@ -1,328 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.engine.query; - -import org.apache.tajo.QueryUnitAttemptId; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.engine.planner.enforce.Enforcer; -import org.apache.tajo.engine.planner.global.DataChannel; -import org.apache.tajo.ipc.TajoWorkerProtocol; -import org.apache.tajo.ipc.TajoWorkerProtocol.QueryUnitRequestProto; -import org.apache.tajo.ipc.TajoWorkerProtocol.QueryUnitRequestProtoOrBuilder; -import org.apache.tajo.worker.FetchImpl; - -import java.util.ArrayList; -import java.util.List; - -import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; - -public class QueryUnitRequestImpl implements QueryUnitRequest { - - private QueryUnitAttemptId id; - private List<FragmentProto> fragments; - private String outputTable; - private boolean isUpdated; - private boolean clusteredOutput; - private String serializedData; // logical node - private Boolean interQuery; - private List<FetchImpl> fetches; - private Boolean shouldDie; - private QueryContext queryContext; - private DataChannel dataChannel; - private Enforcer enforcer; - - private QueryUnitRequestProto proto = QueryUnitRequestProto.getDefaultInstance(); - private QueryUnitRequestProto.Builder builder = null; - private boolean viaProto = false; - - public QueryUnitRequestImpl() { - builder = QueryUnitRequestProto.newBuilder(); - this.id = null; - this.isUpdated = false; - } - - public QueryUnitRequestImpl(QueryUnitAttemptId id, List<FragmentProto> fragments, - String outputTable, boolean clusteredOutput, - String serializedData, QueryContext queryContext, DataChannel channel, Enforcer enforcer) { - this(); - this.set(id, fragments, outputTable, clusteredOutput, serializedData, queryContext, channel, enforcer); - } - - public QueryUnitRequestImpl(QueryUnitRequestProto proto) { - this.proto = proto; - viaProto = true; - id = null; - isUpdated = false; - } - - public void set(QueryUnitAttemptId id, List<FragmentProto> fragments, - String outputTable, boolean clusteredOutput, - String serializedData, QueryContext queryContext, DataChannel dataChannel, Enforcer enforcer) { - this.id = id; - this.fragments = fragments; - this.outputTable = outputTable; - this.clusteredOutput = clusteredOutput; - this.serializedData = serializedData; - this.isUpdated = true; - this.queryContext = queryContext; - this.queryContext = queryContext; - this.dataChannel = dataChannel; - this.enforcer = enforcer; - } - - @Override - public QueryUnitRequestProto getProto() { - mergeLocalToProto(); - proto = viaProto ? proto : builder.build(); - viaProto = true; - return proto; - } - - @Override - public QueryUnitAttemptId getId() { - QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder; - if (id != null) { - return this.id; - } - if (!p.hasId()) { - return null; - } - this.id = new QueryUnitAttemptId(p.getId()); - return this.id; - } - - @Override - public List<FragmentProto> getFragments() { - QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder; - if (fragments != null) { - return fragments; - } - if (fragments == null) { - fragments = new ArrayList<FragmentProto>(); - } - for (int i = 0; i < p.getFragmentsCount(); i++) { - fragments.add(p.getFragments(i)); - } - return this.fragments; - } - - @Override - public String getOutputTableId() { - QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder; - if (outputTable != null) { - return this.outputTable; - } - if (!p.hasOutputTable()) { - return null; - } - this.outputTable = p.getOutputTable(); - return this.outputTable; - } - - @Override - public boolean isClusteredOutput() { - QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder; - if (isUpdated) { - return this.clusteredOutput; - } - if (!p.hasClusteredOutput()) { - return false; - } - this.clusteredOutput = p.getClusteredOutput(); - this.isUpdated = true; - return this.clusteredOutput; - } - - @Override - public String getSerializedData() { - QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder; - if (this.serializedData != null) { - return this.serializedData; - } - if (!p.hasSerializedData()) { - return null; - } - this.serializedData = p.getSerializedData(); - return this.serializedData; - } - - public boolean isInterQuery() { - QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder; - if (interQuery != null) { - return interQuery; - } - if (!p.hasInterQuery()) { - return false; - } - this.interQuery = p.getInterQuery(); - return this.interQuery; - } - - public void setInterQuery() { - maybeInitBuilder(); - this.interQuery = true; - } - - public void addFetch(String name, FetchImpl fetch) { - maybeInitBuilder(); - initFetches(); - fetch.setName(name); - fetches.add(fetch); - } - - public QueryContext getQueryContext(TajoConf conf) { - QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder; - if (queryContext != null) { - return queryContext; - } - if (!p.hasQueryContext()) { - return null; - } - this.queryContext = new QueryContext(conf, p.getQueryContext()); - return this.queryContext; - } - - public void setQueryContext(QueryContext queryContext) { - maybeInitBuilder(); - this.queryContext = queryContext; - } - - public void setDataChannel(DataChannel dataChannel) { - maybeInitBuilder(); - this.dataChannel = dataChannel; - } - - @Override - public DataChannel getDataChannel() { - QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder; - if (dataChannel != null) { - return dataChannel; - } - if (!p.hasDataChannel()) { - return null; - } - this.dataChannel = new DataChannel(p.getDataChannel()); - return this.dataChannel; - } - - @Override - public Enforcer getEnforcer() { - QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder; - if (enforcer != null) { - return enforcer; - } - if (!p.hasEnforcer()) { - return null; - } - this.enforcer = new Enforcer(p.getEnforcer()); - return this.enforcer; - } - - public List<FetchImpl> getFetches() { - initFetches(); - - return this.fetches; - } - - private void initFetches() { - if (this.fetches != null) { - return; - } - QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder; - this.fetches = new ArrayList<FetchImpl>(); - for(TajoWorkerProtocol.FetchProto fetch : p.getFetchesList()) { - fetches.add(new FetchImpl(fetch)); - } - } - - @Override - public boolean shouldDie() { - QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder; - if (shouldDie != null) { - return shouldDie; - } - if (!p.hasShouldDie()) { - return false; - } - this.shouldDie = p.getShouldDie(); - return this.shouldDie; - } - - @Override - public void setShouldDie() { - maybeInitBuilder(); - shouldDie = true; - } - - private void maybeInitBuilder() { - if (viaProto || builder == null) { - builder = QueryUnitRequestProto.newBuilder(proto); - } - viaProto = true; - } - - private void mergeLocalToBuilder() { - if (id != null) { - builder.setId(this.id.getProto()); - } - if (fragments != null) { - for (int i = 0; i < fragments.size(); i++) { - builder.addFragments(fragments.get(i)); - } - } - if (this.outputTable != null) { - builder.setOutputTable(this.outputTable); - } - if (this.isUpdated) { - builder.setClusteredOutput(this.clusteredOutput); - } - if (this.serializedData != null) { - builder.setSerializedData(this.serializedData); - } - if (this.interQuery != null) { - builder.setInterQuery(this.interQuery); - } - if (this.fetches != null) { - for (int i = 0; i < fetches.size(); i++) { - builder.addFetches(fetches.get(i).getProto()); - } - } - if (this.shouldDie != null) { - builder.setShouldDie(this.shouldDie); - } - if (this.queryContext != null) { - builder.setQueryContext(queryContext.getProto()); - } - if (this.dataChannel != null) { - builder.setDataChannel(dataChannel.getProto()); - } - if (this.enforcer != null) { - builder.setEnforcer(enforcer.getProto()); - } - } - - private void mergeLocalToProto() { - if(viaProto) { - maybeInitBuilder(); - } - mergeLocalToBuilder(); - proto = builder.build(); - viaProto = true; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java new file mode 100644 index 0000000..a3e586a --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * + */ +package org.apache.tajo.engine.query; + +import org.apache.tajo.TaskAttemptId; +import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.common.ProtoObject; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.engine.planner.enforce.Enforcer; +import org.apache.tajo.engine.planner.global.DataChannel; +import org.apache.tajo.ipc.TajoWorkerProtocol; +import org.apache.tajo.worker.FetchImpl; + +import java.util.List; + +public interface TaskRequest extends ProtoObject<TajoWorkerProtocol.TaskRequestProto> { + + public TaskAttemptId getId(); + public List<CatalogProtos.FragmentProto> getFragments(); + public String getOutputTableId(); + public boolean isClusteredOutput(); + public String getSerializedData(); + public boolean isInterQuery(); + public void setInterQuery(); + public void addFetch(String name, FetchImpl fetch); + public List<FetchImpl> getFetches(); + public boolean shouldDie(); + public void setShouldDie(); + public QueryContext getQueryContext(TajoConf conf); + public DataChannel getDataChannel(); + public Enforcer getEnforcer(); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java new file mode 100644 index 0000000..cef5488 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java @@ -0,0 +1,328 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.query; + +import org.apache.tajo.TaskAttemptId; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.engine.planner.enforce.Enforcer; +import org.apache.tajo.engine.planner.global.DataChannel; +import org.apache.tajo.ipc.TajoWorkerProtocol; +import org.apache.tajo.ipc.TajoWorkerProtocol.TaskRequestProto; +import org.apache.tajo.ipc.TajoWorkerProtocol.TaskRequestProtoOrBuilder; +import org.apache.tajo.worker.FetchImpl; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; + +public class TaskRequestImpl implements TaskRequest { + + private TaskAttemptId id; + private List<FragmentProto> fragments; + private String outputTable; + private boolean isUpdated; + private boolean clusteredOutput; + private String serializedData; // logical node + private Boolean interQuery; + private List<FetchImpl> fetches; + private Boolean shouldDie; + private QueryContext queryContext; + private DataChannel dataChannel; + private Enforcer enforcer; + + private TaskRequestProto proto = TajoWorkerProtocol.TaskRequestProto.getDefaultInstance(); + private TajoWorkerProtocol.TaskRequestProto.Builder builder = null; + private boolean viaProto = false; + + public TaskRequestImpl() { + builder = TaskRequestProto.newBuilder(); + this.id = null; + this.isUpdated = false; + } + + public TaskRequestImpl(TaskAttemptId id, List<FragmentProto> fragments, + String outputTable, boolean clusteredOutput, + String serializedData, QueryContext queryContext, DataChannel channel, Enforcer enforcer) { + this(); + this.set(id, fragments, outputTable, clusteredOutput, serializedData, queryContext, channel, enforcer); + } + + public TaskRequestImpl(TaskRequestProto proto) { + this.proto = proto; + viaProto = true; + id = null; + isUpdated = false; + } + + public void set(TaskAttemptId id, List<FragmentProto> fragments, + String outputTable, boolean clusteredOutput, + String serializedData, QueryContext queryContext, DataChannel dataChannel, Enforcer enforcer) { + this.id = id; + this.fragments = fragments; + this.outputTable = outputTable; + this.clusteredOutput = clusteredOutput; + this.serializedData = serializedData; + this.isUpdated = true; + this.queryContext = queryContext; + this.queryContext = queryContext; + this.dataChannel = dataChannel; + this.enforcer = enforcer; + } + + @Override + public TaskRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public TaskAttemptId getId() { + TaskRequestProtoOrBuilder p = viaProto ? proto : builder; + if (id != null) { + return this.id; + } + if (!p.hasId()) { + return null; + } + this.id = new TaskAttemptId(p.getId()); + return this.id; + } + + @Override + public List<FragmentProto> getFragments() { + TaskRequestProtoOrBuilder p = viaProto ? proto : builder; + if (fragments != null) { + return fragments; + } + if (fragments == null) { + fragments = new ArrayList<FragmentProto>(); + } + for (int i = 0; i < p.getFragmentsCount(); i++) { + fragments.add(p.getFragments(i)); + } + return this.fragments; + } + + @Override + public String getOutputTableId() { + TaskRequestProtoOrBuilder p = viaProto ? proto : builder; + if (outputTable != null) { + return this.outputTable; + } + if (!p.hasOutputTable()) { + return null; + } + this.outputTable = p.getOutputTable(); + return this.outputTable; + } + + @Override + public boolean isClusteredOutput() { + TaskRequestProtoOrBuilder p = viaProto ? proto : builder; + if (isUpdated) { + return this.clusteredOutput; + } + if (!p.hasClusteredOutput()) { + return false; + } + this.clusteredOutput = p.getClusteredOutput(); + this.isUpdated = true; + return this.clusteredOutput; + } + + @Override + public String getSerializedData() { + TaskRequestProtoOrBuilder p = viaProto ? proto : builder; + if (this.serializedData != null) { + return this.serializedData; + } + if (!p.hasSerializedData()) { + return null; + } + this.serializedData = p.getSerializedData(); + return this.serializedData; + } + + public boolean isInterQuery() { + TaskRequestProtoOrBuilder p = viaProto ? proto : builder; + if (interQuery != null) { + return interQuery; + } + if (!p.hasInterQuery()) { + return false; + } + this.interQuery = p.getInterQuery(); + return this.interQuery; + } + + public void setInterQuery() { + maybeInitBuilder(); + this.interQuery = true; + } + + public void addFetch(String name, FetchImpl fetch) { + maybeInitBuilder(); + initFetches(); + fetch.setName(name); + fetches.add(fetch); + } + + public QueryContext getQueryContext(TajoConf conf) { + TaskRequestProtoOrBuilder p = viaProto ? proto : builder; + if (queryContext != null) { + return queryContext; + } + if (!p.hasQueryContext()) { + return null; + } + this.queryContext = new QueryContext(conf, p.getQueryContext()); + return this.queryContext; + } + + public void setQueryContext(QueryContext queryContext) { + maybeInitBuilder(); + this.queryContext = queryContext; + } + + public void setDataChannel(DataChannel dataChannel) { + maybeInitBuilder(); + this.dataChannel = dataChannel; + } + + @Override + public DataChannel getDataChannel() { + TaskRequestProtoOrBuilder p = viaProto ? proto : builder; + if (dataChannel != null) { + return dataChannel; + } + if (!p.hasDataChannel()) { + return null; + } + this.dataChannel = new DataChannel(p.getDataChannel()); + return this.dataChannel; + } + + @Override + public Enforcer getEnforcer() { + TaskRequestProtoOrBuilder p = viaProto ? proto : builder; + if (enforcer != null) { + return enforcer; + } + if (!p.hasEnforcer()) { + return null; + } + this.enforcer = new Enforcer(p.getEnforcer()); + return this.enforcer; + } + + public List<FetchImpl> getFetches() { + initFetches(); + + return this.fetches; + } + + private void initFetches() { + if (this.fetches != null) { + return; + } + TaskRequestProtoOrBuilder p = viaProto ? proto : builder; + this.fetches = new ArrayList<FetchImpl>(); + for(TajoWorkerProtocol.FetchProto fetch : p.getFetchesList()) { + fetches.add(new FetchImpl(fetch)); + } + } + + @Override + public boolean shouldDie() { + TaskRequestProtoOrBuilder p = viaProto ? proto : builder; + if (shouldDie != null) { + return shouldDie; + } + if (!p.hasShouldDie()) { + return false; + } + this.shouldDie = p.getShouldDie(); + return this.shouldDie; + } + + @Override + public void setShouldDie() { + maybeInitBuilder(); + shouldDie = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = TajoWorkerProtocol.TaskRequestProto.newBuilder(proto); + } + viaProto = true; + } + + private void mergeLocalToBuilder() { + if (id != null) { + builder.setId(this.id.getProto()); + } + if (fragments != null) { + for (int i = 0; i < fragments.size(); i++) { + builder.addFragments(fragments.get(i)); + } + } + if (this.outputTable != null) { + builder.setOutputTable(this.outputTable); + } + if (this.isUpdated) { + builder.setClusteredOutput(this.clusteredOutput); + } + if (this.serializedData != null) { + builder.setSerializedData(this.serializedData); + } + if (this.interQuery != null) { + builder.setInterQuery(this.interQuery); + } + if (this.fetches != null) { + for (int i = 0; i < fetches.size(); i++) { + builder.addFetches(fetches.get(i).getProto()); + } + } + if (this.shouldDie != null) { + builder.setShouldDie(this.shouldDie); + } + if (this.queryContext != null) { + builder.setQueryContext(queryContext.getProto()); + } + if (this.dataChannel != null) { + builder.setDataChannel(dataChannel.getProto()); + } + if (this.enforcer != null) { + builder.setEnforcer(enforcer.getProto()); + } + } + + private void mergeLocalToProto() { + if(viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java index 01137aa..d9d496e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java @@ -28,18 +28,18 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.util.RackResolver; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.QueryIdFactory; -import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.TaskAttemptId; import org.apache.tajo.engine.planner.global.ExecutionBlock; import org.apache.tajo.engine.planner.global.MasterPlan; -import org.apache.tajo.engine.query.QueryUnitRequest; -import org.apache.tajo.engine.query.QueryUnitRequestImpl; +import org.apache.tajo.engine.query.TaskRequest; +import org.apache.tajo.engine.query.TaskRequestImpl; import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.master.event.*; -import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext; +import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext; import org.apache.tajo.master.event.TaskSchedulerEvent.EventType; -import org.apache.tajo.master.querymaster.QueryUnit; -import org.apache.tajo.master.querymaster.QueryUnitAttempt; +import org.apache.tajo.master.querymaster.Task; +import org.apache.tajo.master.querymaster.TaskAttempt; import org.apache.tajo.master.querymaster.SubQuery; import org.apache.tajo.master.container.TajoContainerId; import org.apache.tajo.storage.DataLocation; @@ -114,14 +114,14 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { super.start(); } - private static final QueryUnitAttemptId NULL_ATTEMPT_ID; - public static final TajoWorkerProtocol.QueryUnitRequestProto stopTaskRunnerReq; + private static final TaskAttemptId NULL_ATTEMPT_ID; + public static final TajoWorkerProtocol.TaskRequestProto stopTaskRunnerReq; static { ExecutionBlockId nullSubQuery = QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0); - NULL_ATTEMPT_ID = QueryIdFactory.newQueryUnitAttemptId(QueryIdFactory.newQueryUnitId(nullSubQuery, 0), 0); + NULL_ATTEMPT_ID = QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(nullSubQuery, 0), 0); - TajoWorkerProtocol.QueryUnitRequestProto.Builder builder = - TajoWorkerProtocol.QueryUnitRequestProto.newBuilder(); + TajoWorkerProtocol.TaskRequestProto.Builder builder = + TajoWorkerProtocol.TaskRequestProto.newBuilder(); builder.setId(NULL_ATTEMPT_ID.getProto()); builder.setShouldDie(true); builder.setOutputTable(""); @@ -191,8 +191,8 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { if (event instanceof FragmentScheduleEvent) { FragmentScheduleEvent castEvent = (FragmentScheduleEvent) event; if (context.isLeafQuery()) { - QueryUnitAttemptScheduleContext queryUnitContext = new QueryUnitAttemptScheduleContext(); - QueryUnit task = SubQuery.newEmptyQueryUnit(context, queryUnitContext, subQuery, nextTaskId++); + TaskAttemptScheduleContext taskContext = new TaskAttemptScheduleContext(); + Task task = SubQuery.newEmptyTask(context, taskContext, subQuery, nextTaskId++); task.addFragment(castEvent.getLeftFragment(), true); scheduledObjectNum++; if (castEvent.hasRightFragments()) { @@ -216,8 +216,8 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { } else if (event instanceof FetchScheduleEvent) { FetchScheduleEvent castEvent = (FetchScheduleEvent) event; Map<String, List<FetchImpl>> fetches = castEvent.getFetches(); - QueryUnitAttemptScheduleContext queryUnitContext = new QueryUnitAttemptScheduleContext(); - QueryUnit task = SubQuery.newEmptyQueryUnit(context, queryUnitContext, subQuery, nextTaskId++); + TaskAttemptScheduleContext taskScheduleContext = new TaskAttemptScheduleContext(); + Task task = SubQuery.newEmptyTask(context, taskScheduleContext, subQuery, nextTaskId++); scheduledObjectNum++; for (Entry<String, List<FetchImpl>> eachFetch : fetches.entrySet()) { task.addFetches(eachFetch.getKey(), eachFetch.getValue()); @@ -230,8 +230,8 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { task.addFragments(Arrays.asList(broadcastFragmentsForNonLeafTask)); } subQuery.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE)); - } else if (event instanceof QueryUnitAttemptScheduleEvent) { - QueryUnitAttemptScheduleEvent castEvent = (QueryUnitAttemptScheduleEvent) event; + } else if (event instanceof TaskAttemptToSchedulerEvent) { + TaskAttemptToSchedulerEvent castEvent = (TaskAttemptToSchedulerEvent) event; if (context.isLeafQuery()) { scheduledRequests.addLeafTask(castEvent); } else { @@ -240,12 +240,12 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { } } else if (event.getType() == EventType.T_SCHEDULE_CANCEL) { // when a subquery is killed, unassigned query unit attmpts are canceled from the scheduler. - // This event is triggered by QueryUnitAttempt. - QueryUnitAttemptScheduleEvent castedEvent = (QueryUnitAttemptScheduleEvent) event; - scheduledRequests.leafTasks.remove(castedEvent.getQueryUnitAttempt().getId()); - LOG.info(castedEvent.getQueryUnitAttempt().getId() + " is canceled from " + this.getClass().getSimpleName()); - ((QueryUnitAttemptScheduleEvent) event).getQueryUnitAttempt().handle( - new TaskAttemptEvent(castedEvent.getQueryUnitAttempt().getId(), TaskAttemptEventType.TA_SCHEDULE_CANCELED)); + // This event is triggered by TaskAttempt. + TaskAttemptToSchedulerEvent castedEvent = (TaskAttemptToSchedulerEvent) event; + scheduledRequests.leafTasks.remove(castedEvent.getTaskAttempt().getId()); + LOG.info(castedEvent.getTaskAttempt().getId() + " is canceled from " + this.getClass().getSimpleName()); + ((TaskAttemptToSchedulerEvent) event).getTaskAttempt().handle( + new TaskAttemptEvent(castedEvent.getTaskAttempt().getId(), TaskAttemptEventType.TA_SCHEDULE_CANCELED)); } } @@ -337,8 +337,8 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { private final String host; private final String rack; /** A key is disk volume, and a value is a list of tasks to be scheduled. */ - private Map<Integer, LinkedHashSet<QueryUnitAttempt>> unassignedTaskForEachVolume = - Collections.synchronizedMap(new HashMap<Integer, LinkedHashSet<QueryUnitAttempt>>()); + private Map<Integer, LinkedHashSet<TaskAttempt>> unassignedTaskForEachVolume = + Collections.synchronizedMap(new HashMap<Integer, LinkedHashSet<TaskAttempt>>()); /** A value is last assigned volume id for each task runner */ private HashMap<TajoContainerId, Integer> lastAssignedVolumeId = new HashMap<TajoContainerId, Integer>(); @@ -360,11 +360,11 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { this.rack = rack; } - public synchronized void addQueryUnitAttempt(int volumeId, QueryUnitAttempt attemptId){ + public synchronized void addTaskAttempt(int volumeId, TaskAttempt attemptId){ synchronized (unassignedTaskForEachVolume){ - LinkedHashSet<QueryUnitAttempt> list = unassignedTaskForEachVolume.get(volumeId); + LinkedHashSet<TaskAttempt> list = unassignedTaskForEachVolume.get(volumeId); if (list == null) { - list = new LinkedHashSet<QueryUnitAttempt>(); + list = new LinkedHashSet<TaskAttempt>(); unassignedTaskForEachVolume.put(volumeId, list); } list.add(attemptId); @@ -381,9 +381,9 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { * 2. unknown block or Non-splittable task in host * 3. remote tasks. unassignedTaskForEachVolume is only contained local task. so it will be null */ - public synchronized QueryUnitAttemptId getLocalTask(TajoContainerId containerId) { + public synchronized TaskAttemptId getLocalTask(TajoContainerId containerId) { int volumeId; - QueryUnitAttemptId queryUnitAttemptId = null; + TaskAttemptId taskAttemptId = null; if (!lastAssignedVolumeId.containsKey(containerId)) { volumeId = getLowestVolumeId(); @@ -396,7 +396,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { int retry = unassignedTaskForEachVolume.size(); do { //clean and get a remaining local task - queryUnitAttemptId = getAndRemove(volumeId); + taskAttemptId = getAndRemove(volumeId); if(!unassignedTaskForEachVolume.containsKey(volumeId)) { decreaseConcurrency(containerId); if (volumeId > REMOTE) { @@ -404,7 +404,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { } } - if (queryUnitAttemptId == null) { + if (taskAttemptId == null) { //reassign next volume volumeId = getLowestVolumeId(); increaseConcurrency(containerId, volumeId); @@ -416,19 +416,19 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { } else { this.remainTasksNum.set(0); } - return queryUnitAttemptId; + return taskAttemptId; } - public synchronized QueryUnitAttemptId getQueryUnitAttemptIdByRack(String rack) { - QueryUnitAttemptId queryUnitAttemptId = null; + public synchronized TaskAttemptId getTaskAttemptIdByRack(String rack) { + TaskAttemptId taskAttemptId = null; if (unassignedTaskForEachVolume.size() > 0 && this.rack.equals(rack)) { int retry = unassignedTaskForEachVolume.size(); do { //clean and get a remaining task int volumeId = getLowestVolumeId(); - queryUnitAttemptId = getAndRemove(volumeId); - if (queryUnitAttemptId == null) { + taskAttemptId = getAndRemove(volumeId); + if (taskAttemptId == null) { if (volumeId > REMOTE) { diskVolumeLoads.remove(volumeId); } @@ -438,29 +438,29 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { } } while (retry > 0); } - return queryUnitAttemptId; + return taskAttemptId; } - private synchronized QueryUnitAttemptId getAndRemove(int volumeId){ - QueryUnitAttemptId queryUnitAttemptId = null; - if(!unassignedTaskForEachVolume.containsKey(volumeId)) return queryUnitAttemptId; + private synchronized TaskAttemptId getAndRemove(int volumeId){ + TaskAttemptId taskAttemptId = null; + if(!unassignedTaskForEachVolume.containsKey(volumeId)) return taskAttemptId; - LinkedHashSet<QueryUnitAttempt> list = unassignedTaskForEachVolume.get(volumeId); + LinkedHashSet<TaskAttempt> list = unassignedTaskForEachVolume.get(volumeId); if(list != null && list.size() > 0){ - QueryUnitAttempt queryUnitAttempt; + TaskAttempt taskAttempt; synchronized (unassignedTaskForEachVolume) { - Iterator<QueryUnitAttempt> iterator = list.iterator(); - queryUnitAttempt = iterator.next(); + Iterator<TaskAttempt> iterator = list.iterator(); + taskAttempt = iterator.next(); iterator.remove(); } this.remainTasksNum.getAndDecrement(); - queryUnitAttemptId = queryUnitAttempt.getId(); - for (DataLocation location : queryUnitAttempt.getQueryUnit().getDataLocations()) { + taskAttemptId = taskAttempt.getId(); + for (DataLocation location : taskAttempt.getTask().getDataLocations()) { if (!this.getHost().equals(location.getHost())) { HostVolumeMapping volumeMapping = scheduledRequests.leafTaskHostMapping.get(location.getHost()); if (volumeMapping != null) { - volumeMapping.removeQueryUnitAttempt(location.getVolumeId(), queryUnitAttempt); + volumeMapping.removeTaskAttempt(location.getVolumeId(), taskAttempt); } } } @@ -469,16 +469,16 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { if(list == null || list.isEmpty()) { unassignedTaskForEachVolume.remove(volumeId); } - return queryUnitAttemptId; + return taskAttemptId; } - private synchronized void removeQueryUnitAttempt(int volumeId, QueryUnitAttempt queryUnitAttempt){ + private synchronized void removeTaskAttempt(int volumeId, TaskAttempt taskAttempt){ if(!unassignedTaskForEachVolume.containsKey(volumeId)) return; - LinkedHashSet<QueryUnitAttempt> tasks = unassignedTaskForEachVolume.get(volumeId); + LinkedHashSet<TaskAttempt> tasks = unassignedTaskForEachVolume.get(volumeId); if(tasks != null && tasks.size() > 0){ - tasks.remove(queryUnitAttempt); + tasks.remove(taskAttempt); remainTasksNum.getAndDecrement(); } else { unassignedTaskForEachVolume.remove(volumeId); @@ -596,14 +596,14 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { // two list leafTasks and nonLeafTasks keep all tasks to be scheduled. Even though some task is included in // leafTaskHostMapping or leafTasksRackMapping, some task T will not be sent to a task runner // if the task is not included in leafTasks and nonLeafTasks. - private final Set<QueryUnitAttemptId> leafTasks = Collections.synchronizedSet(new HashSet<QueryUnitAttemptId>()); - private final Set<QueryUnitAttemptId> nonLeafTasks = Collections.synchronizedSet(new HashSet<QueryUnitAttemptId>()); + private final Set<TaskAttemptId> leafTasks = Collections.synchronizedSet(new HashSet<TaskAttemptId>()); + private final Set<TaskAttemptId> nonLeafTasks = Collections.synchronizedSet(new HashSet<TaskAttemptId>()); private Map<String, HostVolumeMapping> leafTaskHostMapping = Maps.newConcurrentMap(); - private final Map<String, HashSet<QueryUnitAttemptId>> leafTasksRackMapping = Maps.newConcurrentMap(); + private final Map<String, HashSet<TaskAttemptId>> leafTasksRackMapping = Maps.newConcurrentMap(); - private synchronized void addLeafTask(QueryUnitAttemptScheduleEvent event) { - QueryUnitAttempt queryUnitAttempt = event.getQueryUnitAttempt(); - List<DataLocation> locations = queryUnitAttempt.getQueryUnit().getDataLocations(); + private synchronized void addLeafTask(TaskAttemptToSchedulerEvent event) { + TaskAttempt taskAttempt = event.getTaskAttempt(); + List<DataLocation> locations = taskAttempt.getTask().getDataLocations(); for (DataLocation location : locations) { String host = location.getHost(); @@ -614,30 +614,30 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { hostVolumeMapping = new HostVolumeMapping(host, rack); leafTaskHostMapping.put(host, hostVolumeMapping); } - hostVolumeMapping.addQueryUnitAttempt(location.getVolumeId(), queryUnitAttempt); + hostVolumeMapping.addTaskAttempt(location.getVolumeId(), taskAttempt); if (LOG.isDebugEnabled()) { LOG.debug("Added attempt req to host " + host); } - HashSet<QueryUnitAttemptId> list = leafTasksRackMapping.get(hostVolumeMapping.getRack()); + HashSet<TaskAttemptId> list = leafTasksRackMapping.get(hostVolumeMapping.getRack()); if (list == null) { - list = new HashSet<QueryUnitAttemptId>(); + list = new HashSet<TaskAttemptId>(); leafTasksRackMapping.put(hostVolumeMapping.getRack(), list); } - list.add(queryUnitAttempt.getId()); + list.add(taskAttempt.getId()); if (LOG.isDebugEnabled()) { LOG.debug("Added attempt req to rack " + hostVolumeMapping.getRack()); } } - leafTasks.add(queryUnitAttempt.getId()); + leafTasks.add(taskAttempt.getId()); } - private void addNonLeafTask(QueryUnitAttemptScheduleEvent event) { - nonLeafTasks.add(event.getQueryUnitAttempt().getId()); + private void addNonLeafTask(TaskAttemptToSchedulerEvent event) { + nonLeafTasks.add(event.getTaskAttempt().getId()); } public int leafTaskNum() { @@ -648,14 +648,14 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { return nonLeafTasks.size(); } - public Set<QueryUnitAttemptId> assignedRequest = new HashSet<QueryUnitAttemptId>(); + public Set<TaskAttemptId> assignedRequest = new HashSet<TaskAttemptId>(); - private QueryUnitAttemptId allocateLocalTask(String host, TajoContainerId containerId){ + private TaskAttemptId allocateLocalTask(String host, TajoContainerId containerId){ HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host); if (hostVolumeMapping != null) { //tajo host is located in hadoop datanode for (int i = 0; i < hostVolumeMapping.getRemainingLocalTaskSize(); i++) { - QueryUnitAttemptId attemptId = hostVolumeMapping.getLocalTask(containerId); + TaskAttemptId attemptId = hostVolumeMapping.getLocalTask(containerId); if(attemptId == null) break; //find remaining local task @@ -671,11 +671,11 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { return null; } - private QueryUnitAttemptId allocateRackTask(String host) { + private TaskAttemptId allocateRackTask(String host) { List<HostVolumeMapping> remainingTasks = Lists.newArrayList(leafTaskHostMapping.values()); String rack = RackResolver.resolve(host).getNetworkLocation(); - QueryUnitAttemptId attemptId = null; + TaskAttemptId attemptId = null; if (remainingTasks.size() > 0) { synchronized (scheduledRequests) { @@ -697,7 +697,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { for (HostVolumeMapping tasks : remainingTasks) { for (int i = 0; i < tasks.getRemainingLocalTaskSize(); i++) { - QueryUnitAttemptId tId = tasks.getQueryUnitAttemptIdByRack(rack); + TaskAttemptId tId = tasks.getTaskAttemptIdByRack(rack); if (tId == null) break; @@ -713,12 +713,12 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { //find task in rack if (attemptId == null) { - HashSet<QueryUnitAttemptId> list = leafTasksRackMapping.get(rack); + HashSet<TaskAttemptId> list = leafTasksRackMapping.get(rack); if (list != null) { synchronized (list) { - Iterator<QueryUnitAttemptId> iterator = list.iterator(); + Iterator<TaskAttemptId> iterator = list.iterator(); while (iterator.hasNext()) { - QueryUnitAttemptId tId = iterator.next(); + TaskAttemptId tId = iterator.next(); iterator.remove(); if (leafTasks.contains(tId)) { leafTasks.remove(tId); @@ -788,7 +788,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { ////////////////////////////////////////////////////////////////////// // disk or host-local allocation ////////////////////////////////////////////////////////////////////// - QueryUnitAttemptId attemptId = allocateLocalTask(host, containerId); + TaskAttemptId attemptId = allocateLocalTask(host, containerId); if (attemptId == null) { // if a local task cannot be found HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host); @@ -832,8 +832,8 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { } if (attemptId != null) { - QueryUnit task = subQuery.getQueryUnit(attemptId.getQueryUnitId()); - QueryUnitRequest taskAssign = new QueryUnitRequestImpl( + Task task = subQuery.getTask(attemptId.getTaskId()); + TaskRequest taskAssign = new TaskRequestImpl( attemptId, new ArrayList<FragmentProto>(task.getAllFragments()), "", @@ -878,7 +878,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { taskRequest = taskRequests.pollFirst(); LOG.debug("assignToNonLeafTasks: " + taskRequest.getExecutionBlockId()); - QueryUnitAttemptId attemptId; + TaskAttemptId attemptId; // random allocation if (nonLeafTasks.size() > 0) { synchronized (nonLeafTasks){ @@ -887,9 +887,9 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { } LOG.debug("Assigned based on * match"); - QueryUnit task; - task = subQuery.getQueryUnit(attemptId.getQueryUnitId()); - QueryUnitRequest taskAssign = new QueryUnitRequestImpl( + Task task; + task = subQuery.getTask(attemptId.getTaskId()); + TaskRequest taskAssign = new TaskRequestImpl( attemptId, Lists.newArrayList(task.getAllFragments()), "", http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java index cc99453..0ab19db 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java @@ -24,19 +24,19 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.QueryIdFactory; -import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.TaskAttemptId; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.engine.planner.global.ExecutionBlock; import org.apache.tajo.engine.planner.global.MasterPlan; -import org.apache.tajo.engine.query.QueryUnitRequest; -import org.apache.tajo.engine.query.QueryUnitRequestImpl; +import org.apache.tajo.engine.query.TaskRequest; +import org.apache.tajo.engine.query.TaskRequestImpl; import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.master.event.*; -import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext; +import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext; import org.apache.tajo.master.event.TaskSchedulerEvent.EventType; -import org.apache.tajo.master.querymaster.QueryUnit; -import org.apache.tajo.master.querymaster.QueryUnitAttempt; +import org.apache.tajo.master.querymaster.Task; +import org.apache.tajo.master.querymaster.TaskAttempt; import org.apache.tajo.master.querymaster.SubQuery; import org.apache.tajo.master.container.TajoContainerId; import org.apache.tajo.storage.StorageManager; @@ -126,14 +126,14 @@ public class LazyTaskScheduler extends AbstractTaskScheduler { super.start(); } - private static final QueryUnitAttemptId NULL_ATTEMPT_ID; - public static final TajoWorkerProtocol.QueryUnitRequestProto stopTaskRunnerReq; + private static final TaskAttemptId NULL_ATTEMPT_ID; + public static final TajoWorkerProtocol.TaskRequestProto stopTaskRunnerReq; static { ExecutionBlockId nullSubQuery = QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0); - NULL_ATTEMPT_ID = QueryIdFactory.newQueryUnitAttemptId(QueryIdFactory.newQueryUnitId(nullSubQuery, 0), 0); + NULL_ATTEMPT_ID = QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(nullSubQuery, 0), 0); - TajoWorkerProtocol.QueryUnitRequestProto.Builder builder = - TajoWorkerProtocol.QueryUnitRequestProto.newBuilder(); + TajoWorkerProtocol.TaskRequestProto.Builder builder = + TajoWorkerProtocol.TaskRequestProto.newBuilder(); builder.setId(NULL_ATTEMPT_ID.getProto()); builder.setShouldDie(true); builder.setOutputTable(""); @@ -214,9 +214,9 @@ public class LazyTaskScheduler extends AbstractTaskScheduler { } else if (event instanceof FetchScheduleEvent) { FetchScheduleEvent castEvent = (FetchScheduleEvent) event; scheduledFetches.addFetch(castEvent.getFetches()); - } else if (event instanceof QueryUnitAttemptScheduleEvent) { - QueryUnitAttemptScheduleEvent castEvent = (QueryUnitAttemptScheduleEvent) event; - assignTask(castEvent.getContext(), castEvent.getQueryUnitAttempt()); + } else if (event instanceof TaskAttemptToSchedulerEvent) { + TaskAttemptToSchedulerEvent castEvent = (TaskAttemptToSchedulerEvent) event; + assignTask(castEvent.getContext(), castEvent.getTaskAttempt()); } } } @@ -360,9 +360,9 @@ public class LazyTaskScheduler extends AbstractTaskScheduler { } String host = container.getTaskHostName(); - QueryUnitAttemptScheduleContext queryUnitContext = new QueryUnitAttemptScheduleContext(container.containerID, + TaskAttemptScheduleContext taskContext = new TaskAttemptScheduleContext(container.containerID, host, taskRequest.getCallback()); - QueryUnit task = SubQuery.newEmptyQueryUnit(context, queryUnitContext, subQuery, nextTaskId++); + Task task = SubQuery.newEmptyTask(context, taskContext, subQuery, nextTaskId++); FragmentPair fragmentPair; List<FragmentPair> fragmentPairs = new ArrayList<FragmentPair>(); @@ -467,23 +467,23 @@ public class LazyTaskScheduler extends AbstractTaskScheduler { LOG.debug("Assigned based on * match"); ContainerProxy container = context.getMasterContext().getResourceAllocator().getContainer( taskRequest.getContainerId()); - QueryUnitAttemptScheduleContext queryUnitContext = new QueryUnitAttemptScheduleContext(container.containerID, + TaskAttemptScheduleContext taskScheduleContext = new TaskAttemptScheduleContext(container.containerID, container.getTaskHostName(), taskRequest.getCallback()); - QueryUnit task = SubQuery.newEmptyQueryUnit(context, queryUnitContext, subQuery, nextTaskId++); + Task task = SubQuery.newEmptyTask(context, taskScheduleContext, subQuery, nextTaskId++); task.setFragment(scheduledFragments.getAllFragments()); subQuery.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE)); } } } - private void assignTask(QueryUnitAttemptScheduleContext attemptContext, QueryUnitAttempt taskAttempt) { - QueryUnitAttemptId attemptId = taskAttempt.getId(); - QueryUnitRequest taskAssign = new QueryUnitRequestImpl( + private void assignTask(TaskAttemptScheduleContext attemptContext, TaskAttempt taskAttempt) { + TaskAttemptId attemptId = taskAttempt.getId(); + TaskRequest taskAssign = new TaskRequestImpl( attemptId, - new ArrayList<FragmentProto>(taskAttempt.getQueryUnit().getAllFragments()), + new ArrayList<FragmentProto>(taskAttempt.getTask().getAllFragments()), "", false, - taskAttempt.getQueryUnit().getLogicalPlan().toJson(), + taskAttempt.getTask().getLogicalPlan().toJson(), context.getMasterContext().getQueryContext(), subQuery.getDataChannel(), subQuery.getBlock().getEnforcer()); if (checkIfInterQuery(subQuery.getMasterPlan(), subQuery.getBlock())) { http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java index 64081f3..aced80c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java @@ -21,8 +21,8 @@ package org.apache.tajo.master; import com.google.protobuf.ByteString; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.QueryId; -import org.apache.tajo.QueryUnitAttemptId; -import org.apache.tajo.QueryUnitId; +import org.apache.tajo.TaskAttemptId; +import org.apache.tajo.TaskId; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; import org.apache.tajo.conf.TajoConf; @@ -84,7 +84,7 @@ public class NonForwardQueryResultScanner { FragmentProto[] fragmentProtos = FragmentConvertor.toFragmentProtoArray(fragments.toArray(new Fragment[]{})); this.taskContext = new TaskAttemptContext( new QueryContext(tajoConf), null, - new QueryUnitAttemptId(new QueryUnitId(new ExecutionBlockId(queryId, 1), 0), 0), + new TaskAttemptId(new TaskId(new ExecutionBlockId(queryId, 1), 0), 0), fragmentProtos, null); try { http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java index 0d2acf7..4649d99 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java @@ -22,7 +22,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.TaskAttemptId; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.ContainerProtocol; @@ -75,7 +75,7 @@ public class TajoContainerProxy extends ContainerProxy { * * @param taskAttemptId The TaskAttemptId to be killed. */ - public void killTaskAttempt(QueryUnitAttemptId taskAttemptId) { + public void killTaskAttempt(TaskAttemptId taskAttemptId) { NettyClientBase tajoWorkerRpc = null; try { InetSocketAddress addr = new InetSocketAddress(container.getNodeId().getHost(), container.getNodeId().getPort()); http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerAllocatorEventType.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerAllocatorEventType.java b/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerAllocatorEventType.java index 4d10efe..183aeb5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerAllocatorEventType.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerAllocatorEventType.java @@ -19,7 +19,7 @@ package org.apache.tajo.master.event; public enum ContainerAllocatorEventType { - // producer: QueryUnitAttempt, consumer: ContainerAllocator + // producer: TaskAttempt, consumer: ContainerAllocator CONTAINER_REQ, CONTAINER_DEALLOCATE, CONTAINER_FAILED http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/event/LocalTaskEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/LocalTaskEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/LocalTaskEvent.java index cab2202..5cf9887 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/LocalTaskEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/LocalTaskEvent.java @@ -19,24 +19,24 @@ package org.apache.tajo.master.event; import org.apache.hadoop.yarn.event.AbstractEvent; -import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.TaskAttemptId; import org.apache.tajo.master.container.TajoContainerId; /** * This event is sent to a running TaskAttempt on a worker. */ public class LocalTaskEvent extends AbstractEvent<LocalTaskEventType> { - private final QueryUnitAttemptId taskAttemptId; + private final TaskAttemptId taskAttemptId; private final TajoContainerId containerId; - public LocalTaskEvent(QueryUnitAttemptId taskAttemptId, TajoContainerId containerId, + public LocalTaskEvent(TaskAttemptId taskAttemptId, TajoContainerId containerId, LocalTaskEventType eventType) { super(eventType); this.taskAttemptId = taskAttemptId; this.containerId = containerId; } - public QueryUnitAttemptId getTaskAttemptId() { + public TaskAttemptId getTaskAttemptId() { return taskAttemptId; }
