Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into index_support
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/f674fa8f Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/f674fa8f Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/f674fa8f Branch: refs/heads/index_support Commit: f674fa8f08622ff752ff56bc6b11a93a520e266c Parents: 8ec099c dc49049 Author: Jihoon Son <[email protected]> Authored: Tue Jun 16 07:42:08 2015 +0900 Committer: Jihoon Son <[email protected]> Committed: Tue Jun 16 07:42:08 2015 +0900 ---------------------------------------------------------------------- CHANGES | 9 + .../org/apache/tajo/jdbc/TajoResultSetBase.java | 306 ++-- .../org/apache/tajo/storage/RowStoreUtil.java | 93 +- .../java/org/apache/tajo/datum/DateDatum.java | 34 +- .../main/java/org/apache/tajo/datum/Datum.java | 7 + .../org/apache/tajo/datum/DatumFactory.java | 4 +- .../java/org/apache/tajo/datum/Float4Datum.java | 6 +- .../java/org/apache/tajo/datum/Float8Datum.java | 6 +- .../java/org/apache/tajo/datum/Int2Datum.java | 6 +- .../java/org/apache/tajo/datum/Int4Datum.java | 6 +- .../java/org/apache/tajo/datum/Int8Datum.java | 6 +- .../org/apache/tajo/datum/IntervalDatum.java | 6 +- .../java/org/apache/tajo/datum/TimeDatum.java | 22 +- .../org/apache/tajo/datum/TimestampDatum.java | 45 +- .../org/apache/tajo/storage/EmptyTuple.java | 141 +- .../java/org/apache/tajo/storage/NullTuple.java | 43 +- .../java/org/apache/tajo/storage/Tuple.java | 79 +- .../java/org/apache/tajo/storage/VTuple.java | 218 +-- .../apache/tajo/util/datetime/DateTimeUtil.java | 12 +- .../apache/tajo/datum/TestTimestampDatum.java | 2 +- .../apache/tajo/util/TestDateTimeFormat.java | 2 +- .../tajo/engine/function/builtin/AvgDouble.java | 10 +- .../tajo/engine/function/builtin/AvgFloat.java | 5 +- .../tajo/engine/function/builtin/AvgInt.java | 5 +- .../tajo/engine/function/builtin/AvgLong.java | 10 +- .../tajo/engine/function/builtin/Coalesce.java | 5 +- .../tajo/engine/function/builtin/CountRows.java | 2 +- .../engine/function/builtin/CountValue.java | 2 +- .../function/builtin/CountValueDistinct.java | 13 +- .../tajo/engine/function/builtin/Date.java | 2 +- .../tajo/engine/function/builtin/LastValue.java | 5 +- .../tajo/engine/function/builtin/Lead.java | 6 +- .../tajo/engine/function/builtin/Max.java | 10 +- .../tajo/engine/function/builtin/Min.java | 4 +- .../tajo/engine/function/builtin/RandomInt.java | 2 +- .../tajo/engine/function/builtin/SumDouble.java | 5 +- .../function/builtin/SumDoubleDistinct.java | 10 +- .../tajo/engine/function/builtin/SumFloat.java | 5 +- .../function/builtin/SumFloatDistinct.java | 10 +- .../engine/function/builtin/SumIntDistinct.java | 10 +- .../tajo/engine/function/builtin/SumLong.java | 5 +- .../function/builtin/SumLongDistinct.java | 10 +- .../tajo/engine/function/builtin/Variance.java | 13 +- .../tajo/engine/function/datetime/AddDays.java | 4 +- .../engine/function/datetime/AddMonths.java | 6 +- .../function/datetime/DatePartFromDate.java | 67 +- .../function/datetime/DatePartFromTime.java | 16 +- .../datetime/DatePartFromTimestamp.java | 14 +- .../datetime/DateTimePartFromUnixTimestamp.java | 26 +- .../function/datetime/ToCharTimestamp.java | 10 +- .../tajo/engine/function/datetime/ToDate.java | 6 +- .../function/datetime/ToTimestampInt.java | 5 +- .../function/datetime/ToTimestampText.java | 7 +- .../function/geoip/GeoIPCountryInet4.java | 5 +- .../engine/function/geoip/GeoIPCountryText.java | 5 +- .../function/geoip/GeoIPInCountryInet4.java | 6 +- .../function/geoip/GeoIPInCountryText.java | 6 +- .../function/json/JsonExtractPathText.java | 9 +- .../tajo/engine/function/math/AbsDouble.java | 5 +- .../tajo/engine/function/math/AbsFloat.java | 5 +- .../tajo/engine/function/math/AbsInt.java | 5 +- .../tajo/engine/function/math/AbsLong.java | 5 +- .../apache/tajo/engine/function/math/Acos.java | 5 +- .../apache/tajo/engine/function/math/Asin.java | 5 +- .../apache/tajo/engine/function/math/Atan.java | 5 +- .../apache/tajo/engine/function/math/Atan2.java | 6 +- .../apache/tajo/engine/function/math/Cbrt.java | 5 +- .../apache/tajo/engine/function/math/Ceil.java | 5 +- .../apache/tajo/engine/function/math/Cos.java | 5 +- .../tajo/engine/function/math/Degrees.java | 5 +- .../apache/tajo/engine/function/math/Div.java | 12 +- .../apache/tajo/engine/function/math/Exp.java | 5 +- .../apache/tajo/engine/function/math/Floor.java | 5 +- .../apache/tajo/engine/function/math/Mod.java | 12 +- .../apache/tajo/engine/function/math/Pow.java | 6 +- .../tajo/engine/function/math/Radians.java | 5 +- .../apache/tajo/engine/function/math/Round.java | 5 +- .../tajo/engine/function/math/RoundFloat8.java | 9 +- .../apache/tajo/engine/function/math/Sign.java | 5 +- .../apache/tajo/engine/function/math/Sin.java | 5 +- .../apache/tajo/engine/function/math/Sqrt.java | 5 +- .../apache/tajo/engine/function/math/Tan.java | 5 +- .../tajo/engine/function/string/Ascii.java | 7 +- .../tajo/engine/function/string/BTrim.java | 10 +- .../tajo/engine/function/string/BitLength.java | 8 +- .../tajo/engine/function/string/CharLength.java | 7 +- .../apache/tajo/engine/function/string/Chr.java | 7 +- .../tajo/engine/function/string/Concat.java | 9 +- .../tajo/engine/function/string/Concat_ws.java | 20 +- .../tajo/engine/function/string/Decode.java | 17 +- .../tajo/engine/function/string/Digest.java | 8 +- .../tajo/engine/function/string/Encode.java | 18 +- .../tajo/engine/function/string/FindInSet.java | 21 +- .../tajo/engine/function/string/InitCap.java | 7 +- .../tajo/engine/function/string/LTrim.java | 10 +- .../tajo/engine/function/string/Left.java | 14 +- .../tajo/engine/function/string/Length.java | 5 +- .../tajo/engine/function/string/Locate.java | 15 +- .../tajo/engine/function/string/Lower.java | 7 +- .../tajo/engine/function/string/Lpad.java | 23 +- .../apache/tajo/engine/function/string/Md5.java | 11 +- .../engine/function/string/OctetLength.java | 5 +- .../tajo/engine/function/string/QuoteIdent.java | 6 +- .../tajo/engine/function/string/RTrim.java | 10 +- .../engine/function/string/RegexpReplace.java | 51 +- .../tajo/engine/function/string/Repeat.java | 10 +- .../tajo/engine/function/string/Reverse.java | 7 +- .../tajo/engine/function/string/Right.java | 12 +- .../tajo/engine/function/string/Rpad.java | 31 +- .../tajo/engine/function/string/SplitPart.java | 9 +- .../tajo/engine/function/string/StrPos.java | 15 +- .../tajo/engine/function/string/StrPosb.java | 14 +- .../tajo/engine/function/string/Substr.java | 37 +- .../tajo/engine/function/string/ToBin.java | 5 +- .../tajo/engine/function/string/ToCharLong.java | 4 +- .../tajo/engine/function/string/ToHex.java | 5 +- .../tajo/engine/function/string/Upper.java | 7 +- .../tajo/engine/function/window/FirstValue.java | 6 +- .../apache/tajo/engine/function/window/Lag.java | 8 +- .../tajo/engine/function/window/Rank.java | 2 +- .../engine/planner/RangePartitionAlgorithm.java | 66 +- .../engine/planner/UniformRangePartition.java | 208 ++- .../planner/physical/BSTIndexScanExec.java | 4 + .../planner/physical/CommonHashJoinExec.java | 4 +- .../planner/physical/ComparableVector.java | 10 +- .../DistinctGroupbyFirstAggregationExec.java | 14 +- .../DistinctGroupbyHashAggregationExec.java | 10 +- .../DistinctGroupbySecondAggregationExec.java | 8 +- .../DistinctGroupbySortAggregationExec.java | 2 +- .../DistinctGroupbyThirdAggregationExec.java | 10 +- .../planner/physical/HashAggregateExec.java | 4 +- .../HashBasedColPartitionStoreExec.java | 4 +- .../engine/planner/physical/HashJoinExec.java | 6 +- .../planner/physical/HashLeftOuterJoinExec.java | 3 +- .../planner/physical/HashPartitioner.java | 2 +- .../physical/HashShuffleFileWriteExec.java | 2 +- .../planner/physical/JoinTupleComparator.java | 4 +- .../engine/planner/physical/PhysicalExec.java | 2 +- .../physical/RangeShuffleFileWriteExec.java | 21 +- .../planner/physical/SortAggregateExec.java | 9 +- .../SortBasedColPartitionStoreExec.java | 5 +- .../engine/planner/physical/WindowAggExec.java | 8 +- .../org/apache/tajo/engine/utils/TupleUtil.java | 4 +- .../NonForwardQueryResultSystemScanner.java | 19 +- .../apache/tajo/master/exec/QueryExecutor.java | 2 +- .../tajo/master/rm/TajoResourceTracker.java | 4 +- .../tajo/util/TajoUncaughtExceptionHandler.java | 70 + .../apache/tajo/util/history/HistoryWriter.java | 2 +- .../tajo/worker/ExecutionBlockContext.java | 83 +- .../org/apache/tajo/worker/LegacyTaskImpl.java | 844 ++++++++++ .../apache/tajo/worker/NodeResourceManager.java | 45 +- .../apache/tajo/worker/NodeStatusUpdater.java | 34 +- .../java/org/apache/tajo/worker/TajoWorker.java | 47 +- .../tajo/worker/TajoWorkerManagerService.java | 9 +- .../main/java/org/apache/tajo/worker/Task.java | 1502 ++++++++---------- .../apache/tajo/worker/TaskAttemptContext.java | 61 +- .../org/apache/tajo/worker/TaskContainer.java | 85 + .../org/apache/tajo/worker/TaskExecutor.java | 194 +++ .../java/org/apache/tajo/worker/TaskImpl.java | 837 ++++++++++ .../org/apache/tajo/worker/TaskManager.java | 180 +++ .../java/org/apache/tajo/worker/TaskRunner.java | 10 +- .../apache/tajo/worker/TaskRunnerHistory.java | 1 + .../apache/tajo/worker/TaskRunnerManager.java | 12 +- .../worker/event/ExecutionBlockStartEvent.java | 35 + .../worker/event/ExecutionBlockStopEvent.java | 37 + .../worker/event/NodeResourceAllocateEvent.java | 2 +- .../event/NodeResourceDeallocateEvent.java | 2 +- .../tajo/worker/event/NodeResourceEvent.java | 35 + .../worker/event/NodeResourceManagerEvent.java | 34 - .../tajo/worker/event/NodeStatusEvent.java | 11 +- .../tajo/worker/event/TaskExecutorEvent.java | 44 + .../tajo/worker/event/TaskManagerEvent.java | 43 + .../tajo/worker/event/TaskRunnerEvent.java | 1 + .../tajo/worker/event/TaskRunnerStartEvent.java | 44 +- .../tajo/worker/event/TaskRunnerStopEvent.java | 1 + .../tajo/worker/event/TaskStartEvent.java | 44 + .../tajo/ws/rs/resources/QueryResource.java | 37 +- .../ws/rs/resources/QueryResultResource.java | 21 +- .../src/main/proto/TajoWorkerProtocol.proto | 1 + .../apache/tajo/engine/eval/ExprTestBase.java | 11 +- .../tajo/engine/eval/TestEvalTreeUtil.java | 4 +- .../tajo/engine/eval/TestSQLExpression.java | 7 +- .../planner/TestUniformRangePartition.java | 216 +-- .../planner/physical/TestBNLJoinExec.java | 10 +- .../planner/physical/TestExternalSortExec.java | 2 +- .../physical/TestFullOuterHashJoinExec.java | 6 +- .../physical/TestFullOuterMergeJoinExec.java | 8 +- .../planner/physical/TestHashAntiJoinExec.java | 10 +- .../planner/physical/TestHashJoinExec.java | 10 +- .../planner/physical/TestHashPartitioner.java | 10 +- .../planner/physical/TestHashSemiJoinExec.java | 10 +- .../physical/TestLeftOuterHashJoinExec.java | 6 +- .../planner/physical/TestMergeJoinExec.java | 10 +- .../engine/planner/physical/TestNLJoinExec.java | 10 +- .../planner/physical/TestPhysicalPlanner.java | 68 +- .../physical/TestProgressExternalSortExec.java | 2 +- .../physical/TestRightOuterHashJoinExec.java | 6 +- .../physical/TestRightOuterMergeJoinExec.java | 8 +- .../engine/planner/physical/TestSortExec.java | 8 +- .../planner/physical/TestTupleSorter.java | 4 +- .../apache/tajo/engine/util/TestTupleUtil.java | 23 +- .../TestNonForwardQueryResultSystemScanner.java | 16 +- .../apache/tajo/querymaster/TestKillQuery.java | 135 +- .../org/apache/tajo/storage/TestRowFile.java | 4 +- .../apache/tajo/worker/MockExecutionBlock.java | 42 + .../tajo/worker/MockNodeResourceManager.java | 96 ++ .../tajo/worker/MockNodeStatusUpdater.java | 4 +- .../apache/tajo/worker/MockTaskExecutor.java | 141 ++ .../org/apache/tajo/worker/MockTaskManager.java | 59 + .../apache/tajo/worker/MockWorkerContext.java | 129 ++ .../org/apache/tajo/worker/TestFetcher.java | 14 +- .../tajo/worker/TestNodeResourceManager.java | 135 +- .../tajo/worker/TestNodeStatusUpdater.java | 54 +- .../apache/tajo/worker/TestTaskExecutor.java | 330 ++++ .../org/apache/tajo/worker/TestTaskManager.java | 185 +++ .../tajo/ws/rs/resources/TestQueryResource.java | 2 +- .../rs/resources/TestQueryResultResource.java | 2 +- .../org/apache/tajo/jdbc/MetaDataTuple.java | 57 +- .../apache/tajo/jdbc/TajoMetaDataResultSet.java | 14 +- .../org/apache/tajo/plan/ExprAnnotator.java | 2 +- .../org/apache/tajo/plan/expr/FieldEval.java | 2 +- .../plan/function/PythonAggFunctionInvoke.java | 2 +- .../function/python/PythonScriptEngine.java | 4 +- .../plan/function/stream/CSVLineSerializer.java | 12 +- .../stream/TextFieldSerializerDeserializer.java | 4 +- .../tajo/storage/BaseTupleComparator.java | 4 +- .../storage/BinarySerializerDeserializer.java | 52 +- .../org/apache/tajo/storage/FrameTuple.java | 76 +- .../java/org/apache/tajo/storage/LazyTuple.java | 57 +- .../org/apache/tajo/storage/RowStoreUtil.java | 42 +- .../tajo/storage/SerializerDeserializer.java | 7 +- .../apache/tajo/storage/TableStatistics.java | 31 +- .../storage/TextSerializerDeserializer.java | 67 +- .../org/apache/tajo/storage/TupleRange.java | 2 - .../apache/tajo/tuple/offheap/HeapTuple.java | 51 +- .../apache/tajo/tuple/offheap/UnSafeTuple.java | 55 +- .../org/apache/tajo/storage/TestFrameTuple.java | 14 +- .../org/apache/tajo/storage/TestLazyTuple.java | 26 +- .../tajo/storage/TestTupleComparator.java | 7 +- .../org/apache/tajo/storage/TestVTuple.java | 23 +- .../tajo/tuple/offheap/TestOffHeapRowBlock.java | 26 +- .../storage/hbase/AbstractHBaseAppender.java | 15 +- .../HBaseBinarySerializerDeserializer.java | 35 + .../tajo/storage/hbase/HBasePutAppender.java | 6 +- .../tajo/storage/hbase/HBaseTablespace.java | 2 +- .../hbase/HBaseTextSerializerDeserializer.java | 9 + .../java/org/apache/tajo/storage/CSVFile.java | 9 +- .../storage/FieldSerializerDeserializer.java | 7 +- .../java/org/apache/tajo/storage/RawFile.java | 4 +- .../java/org/apache/tajo/storage/RowFile.java | 28 +- .../apache/tajo/storage/avro/AvroAppender.java | 5 +- .../tajo/storage/json/JsonLineSerializer.java | 2 +- .../tajo/storage/parquet/ParquetAppender.java | 2 +- .../storage/parquet/TajoRecordConverter.java | 2 +- .../tajo/storage/parquet/TajoWriteSupport.java | 26 +- .../org/apache/tajo/storage/rcfile/RCFile.java | 22 +- .../sequencefile/SequenceFileAppender.java | 23 +- .../sequencefile/SequenceFileScanner.java | 3 +- .../tajo/storage/text/CSVLineDeserializer.java | 11 +- .../tajo/storage/text/CSVLineSerializer.java | 5 +- .../text/TextFieldSerializerDeserializer.java | 38 +- .../tajo/storage/TestDelimitedTextFile.java | 4 +- .../tajo/storage/TestFileStorageManager.java | 5 +- .../apache/tajo/storage/TestFileSystems.java | 9 +- .../apache/tajo/storage/TestMergeScanner.java | 20 +- .../org/apache/tajo/storage/TestStorages.java | 57 +- .../apache/tajo/storage/index/TestBSTIndex.java | 92 +- .../index/TestSingleCSVFileBSTIndex.java | 24 +- .../apache/tajo/storage/json/TestJsonSerDe.java | 3 +- .../tajo/storage/parquet/TestReadWrite.java | 2 +- 270 files changed, 6628 insertions(+), 3121 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/f674fa8f/CHANGES ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/f674fa8f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java ---------------------------------------------------------------------- diff --cc tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java index 712fc6d,bc6975a..4ed1313 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java @@@ -233,15 -114,13 +233,19 @@@ public class BSTIndexScanExec extends P } } else { while(reader.isCurInMemory() && (tuple = fileScanner.next()) != null) { ++ LOG.info("while: " + tuple); if (qual.eval(tuple).isTrue()) { projector.eval(tuple, outTuple); ++ LOG.info("return: " + outTuple); return outTuple; } else { long offset = reader.next(); - if (offset == -1) return null; ++ LOG.info("offset: " + offset); + if (offset == -1) { + return null; + } else fileScanner.seek(offset); ++ return null; } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/f674fa8f/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java ---------------------------------------------------------------------- diff --cc tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java index 7cb1716,bc0d212..98c5de6 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java @@@ -23,14 -23,9 +23,13 @@@ import com.google.protobuf.ByteString import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.QueryId; - import org.apache.tajo.SessionVars; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.TaskId; -import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.proto.CatalogProtos.*; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.common.TajoDataTypes.DataType; http://git-wip-us.apache.org/repos/asf/tajo/blob/f674fa8f/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/f674fa8f/tajo-core/src/main/java/org/apache/tajo/worker/Task.java ---------------------------------------------------------------------- diff --cc tajo-core/src/main/java/org/apache/tajo/worker/Task.java index ea04a48,c849940..d2afb83 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java @@@ -18,853 -18,35 +18,663 @@@ package org.apache.tajo.worker; - import com.google.common.annotations.VisibleForTesting; - import com.google.common.collect.Lists; - import com.google.common.collect.Maps; - import io.netty.handler.codec.http.QueryStringDecoder; - import org.apache.commons.lang.exception.ExceptionUtils; - import org.apache.commons.logging.Log; - import org.apache.commons.logging.LogFactory; - import org.apache.hadoop.conf.Configuration; - import org.apache.hadoop.fs.CommonConfigurationKeysPublic; - import org.apache.hadoop.fs.FileStatus; - import org.apache.hadoop.fs.FileSystem; - import org.apache.hadoop.fs.Path; - import org.apache.tajo.TajoProtos; - import org.apache.tajo.TajoProtos.TaskAttemptState; - import org.apache.tajo.TaskAttemptId; - import org.apache.tajo.catalog.Schema; - import org.apache.tajo.catalog.TableDesc; - import org.apache.tajo.catalog.TableMeta; - import org.apache.tajo.catalog.proto.CatalogProtos; - import org.apache.tajo.catalog.statistics.TableStats; - import org.apache.tajo.conf.TajoConf; - import org.apache.tajo.engine.planner.physical.PhysicalExec; - import org.apache.tajo.engine.query.QueryContext; - import org.apache.tajo.engine.query.TaskRequest; - import org.apache.tajo.ipc.QueryMasterProtocol; - import org.apache.tajo.ipc.TajoWorkerProtocol.*; - import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType; - import org.apache.tajo.master.cluster.WorkerConnectionInfo; - import org.apache.tajo.plan.function.python.TajoScriptEngine; - import org.apache.tajo.plan.logical.*; - import org.apache.tajo.plan.serder.LogicalNodeDeserializer; - import org.apache.tajo.plan.util.PlannerUtil; - import org.apache.tajo.pullserver.TajoPullServerService; - import org.apache.tajo.pullserver.retriever.FileChunk; - import org.apache.tajo.rpc.NullCallback; - import org.apache.tajo.storage.*; - import org.apache.tajo.storage.fragment.FileFragment; - import org.apache.tajo.util.NetUtils; + import org.apache.tajo.ipc.TajoWorkerProtocol; - import java.io.File; import java.io.IOException; - import java.net.InetAddress; - import java.net.URI; - import java.util.*; - import java.util.Map.Entry; - import java.util.concurrent.ExecutorService; - import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; - import static org.apache.tajo.plan.serder.PlanProto.ShuffleType; - - public class Task { - private static final Log LOG = LogFactory.getLog(Task.class); - private static final float FETCHER_PROGRESS = 0.5f; - - private final TajoConf systemConf; - private final QueryContext queryContext; - private final ExecutionBlockContext executionBlockContext; - private final TaskAttemptId taskId; - private final String taskRunnerId; - - private final Path taskDir; - private final TaskRequest request; - private TaskAttemptContext context; - private List<Fetcher> fetcherRunners; - private LogicalNode plan; - private final Map<String, TableDesc> descs = Maps.newHashMap(); - private PhysicalExec executor; - private boolean interQuery; - private Path inputTableBaseDir; - - private long startTime; - private long finishTime; - - private final TableStats inputStats; - private List<FileChunk> localChunks; - - // TODO - to be refactored - private ShuffleType shuffleType = null; - private Schema finalSchema = null; - private TupleComparator sortComp = null; - - public Task(String taskRunnerId, - Path baseDir, - TaskAttemptId taskId, - final ExecutionBlockContext executionBlockContext, - final TaskRequest request) throws IOException { - this(taskRunnerId, baseDir, taskId, executionBlockContext.getConf(), executionBlockContext, request); - } - - public Task(String taskRunnerId, - Path baseDir, - TaskAttemptId taskId, - TajoConf conf, - final ExecutionBlockContext executionBlockContext, - final TaskRequest request) throws IOException { - this.taskRunnerId = taskRunnerId; - this.request = request; - this.taskId = taskId; - - this.systemConf = conf; - this.queryContext = request.getQueryContext(systemConf); - this.executionBlockContext = executionBlockContext; - this.taskDir = StorageUtil.concatPath(baseDir, - taskId.getTaskId().getId() + "_" + taskId.getId()); - - this.context = new TaskAttemptContext(queryContext, executionBlockContext, taskId, - request.getFragments().toArray(new FragmentProto[request.getFragments().size()]), taskDir); - this.context.setDataChannel(request.getDataChannel()); - this.context.setEnforcer(request.getEnforcer()); - this.context.setState(TaskAttemptState.TA_PENDING); - this.inputStats = new TableStats(); - this.fetcherRunners = Lists.newArrayList(); - } - - public void initPlan() throws IOException { - plan = LogicalNodeDeserializer.deserialize(queryContext, context.getEvalContext(), request.getPlan()); - updateDescsForScanNodes(NodeType.SCAN); - updateDescsForScanNodes(NodeType.PARTITIONS_SCAN); - updateDescsForScanNodes(NodeType.INDEX_SCAN); - LogicalNode [] scanNode = PlannerUtil.findAllNodes(plan, NodeType.SCAN); - if (scanNode != null) { - for (LogicalNode node : scanNode) { - ScanNode scan = (ScanNode) node; - descs.put(scan.getCanonicalName(), scan.getTableDesc()); - } - } - - LogicalNode [] partitionScanNode = PlannerUtil.findAllNodes(plan, NodeType.PARTITIONS_SCAN); - if (partitionScanNode != null) { - for (LogicalNode node : partitionScanNode) { - PartitionedTableScanNode scan = (PartitionedTableScanNode) node; - descs.put(scan.getCanonicalName(), scan.getTableDesc()); - } - } - - interQuery = request.getProto().getInterQuery(); - if (interQuery) { - context.setInterQuery(); - this.shuffleType = context.getDataChannel().getShuffleType(); - - if (shuffleType == ShuffleType.RANGE_SHUFFLE) { - SortNode sortNode = PlannerUtil.findTopNode(plan, NodeType.SORT); - this.finalSchema = PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys()); - this.sortComp = new BaseTupleComparator(finalSchema, sortNode.getSortKeys()); - } - } else { - Path outFilePath = ((FileTablespace) TableSpaceManager.getFileStorageManager(systemConf)) - .getAppenderFilePath(taskId, queryContext.getStagingDir()); - LOG.info("Output File Path: " + outFilePath); - context.setOutputPath(outFilePath); - } - - this.localChunks = Collections.synchronizedList(new ArrayList<FileChunk>()); - LOG.info("=================================="); - LOG.info("* Stage " + request.getId() + " is initialized"); - LOG.info("* InterQuery: " + interQuery - + (interQuery ? ", Use " + this.shuffleType + " shuffle" : "") + - ", Fragments (num: " + request.getFragments().size() + ")" + - ", Fetches (total:" + request.getFetches().size() + ") :"); - - if(LOG.isDebugEnabled()) { - for (FetchImpl f : request.getFetches()) { - LOG.debug("Table Id: " + f.getName() + ", Simple URIs: " + f.getSimpleURIs()); - } - } - LOG.info("* Local task dir: " + taskDir); - if(LOG.isDebugEnabled()) { - LOG.debug("* plan:\n"); - LOG.debug(plan.toString()); - } - LOG.info("=================================="); - } - - private void updateDescsForScanNodes(NodeType nodeType) { - assert nodeType == NodeType.SCAN || nodeType == NodeType.PARTITIONS_SCAN || nodeType == NodeType.INDEX_SCAN; - LogicalNode[] scanNodes = PlannerUtil.findAllNodes(plan, nodeType); - if (scanNodes != null) { - for (LogicalNode node : scanNodes) { - ScanNode scanNode = (ScanNode) node; - descs.put(scanNode.getCanonicalName(), scanNode.getTableDesc()); - } - } - } - - private void startScriptExecutors() throws IOException { - for (TajoScriptEngine executor : context.getEvalContext().getAllScriptEngines()) { - executor.start(systemConf); - } - } - - private void stopScriptExecutors() { - for (TajoScriptEngine executor : context.getEvalContext().getAllScriptEngines()) { - executor.shutdown(); - } - } - - public void init() throws IOException { - initPlan(); - startScriptExecutors(); - - if (context.getState() == TaskAttemptState.TA_PENDING) { - // initialize a task temporal dir - FileSystem localFS = executionBlockContext.getLocalFS(); - localFS.mkdirs(taskDir); - - if (request.getFetches().size() > 0) { - inputTableBaseDir = localFS.makeQualified( - executionBlockContext.getLocalDirAllocator().getLocalPathForWrite( - getTaskAttemptDir(context.getTaskId()).toString(), systemConf)); - localFS.mkdirs(inputTableBaseDir); - Path tableDir; - for (String inputTable : context.getInputTables()) { - tableDir = new Path(inputTableBaseDir, inputTable); - if (!localFS.exists(tableDir)) { - LOG.info("the directory is created " + tableDir.toUri()); - localFS.mkdirs(tableDir); - } - } - } - // for localizing the intermediate data - fetcherRunners.addAll(getFetchRunners(context, request.getFetches())); - } - } - - public TaskAttemptId getTaskId() { - return taskId; - } - - public TaskAttemptId getId() { - return context.getTaskId(); - } - - public TaskAttemptState getStatus() { - return context.getState(); - } - - public String toString() { - return "queryId: " + this.getId() + " status: " + this.getStatus(); - } - - public void setState(TaskAttemptState status) { - context.setState(status); - } - - public TaskAttemptContext getContext() { - return context; - } - - public boolean hasFetchPhase() { - return fetcherRunners.size() > 0; - } - - public List<Fetcher> getFetchers() { - return new ArrayList<Fetcher>(fetcherRunners); - } - - public void fetch() { - ExecutorService executorService = executionBlockContext.getTaskRunner(taskRunnerId).getFetchLauncher(); - for (Fetcher f : fetcherRunners) { - executorService.submit(new FetchRunner(context, f)); - } - } - - public void kill() { - stopScriptExecutors(); - context.setState(TaskAttemptState.TA_KILLED); - context.stop(); - } - - public void abort() { - stopScriptExecutors(); - context.stop(); - } - - public void cleanUp() { - // remove itself from worker - if (context.getState() == TaskAttemptState.TA_SUCCEEDED) { - synchronized (executionBlockContext.getTasks()) { - executionBlockContext.getTasks().remove(this.getId()); - } - } else { - LOG.error("TaskAttemptId: " + context.getTaskId() + " status: " + context.getState()); - } - } - - public TaskStatusProto getReport() { - TaskStatusProto.Builder builder = TaskStatusProto.newBuilder(); - builder.setWorkerName(executionBlockContext.getWorkerContext().getConnectionInfo().getHostAndPeerRpcPort()); - builder.setId(context.getTaskId().getProto()) - .setProgress(context.getProgress()) - .setState(context.getState()); - - builder.setInputStats(reloadInputStats()); - - if (context.getResultStats() != null) { - builder.setResultStats(context.getResultStats().getProto()); - } - return builder.build(); - } - - public boolean isRunning(){ - return context.getState() == TaskAttemptState.TA_RUNNING; - } - - public boolean isProgressChanged() { - return context.isProgressChanged(); - } - - public void updateProgress() { - if(context != null && context.isStopped()){ - return; - } - - if (executor != null && context.getProgress() < 1.0f) { - context.setExecutorProgress(executor.getProgress()); - } - } - - private CatalogProtos.TableStatsProto reloadInputStats() { - synchronized(inputStats) { - if (this.executor == null) { - return inputStats.getProto(); - } - - TableStats executorInputStats = this.executor.getInputStats(); - - if (executorInputStats != null) { - inputStats.setValues(executorInputStats); - } - return inputStats.getProto(); - } - } - - private TaskCompletionReport getTaskCompletionReport() { - TaskCompletionReport.Builder builder = TaskCompletionReport.newBuilder(); - builder.setId(context.getTaskId().getProto()); - - builder.setInputStats(reloadInputStats()); - - if (context.hasResultStats()) { - builder.setResultStats(context.getResultStats().getProto()); - } else { - builder.setResultStats(new TableStats().getProto()); - } - - Iterator<Entry<Integer, String>> it = context.getShuffleFileOutputs(); - if (it.hasNext()) { - do { - Entry<Integer, String> entry = it.next(); - ShuffleFileOutput.Builder part = ShuffleFileOutput.newBuilder(); - part.setPartId(entry.getKey()); - - // Set output volume - if (context.getPartitionOutputVolume() != null) { - for (Entry<Integer, Long> e : context.getPartitionOutputVolume().entrySet()) { - if (entry.getKey().equals(e.getKey())) { - part.setVolume(e.getValue().longValue()); - break; - } - } - } - - builder.addShuffleFileOutputs(part.build()); - } while (it.hasNext()); - } - - return builder.build(); - } - - private void waitForFetch() throws InterruptedException, IOException { - context.getFetchLatch().await(); - LOG.info(context.getTaskId() + " All fetches are done!"); - Collection<String> inputs = Lists.newArrayList(context.getInputTables()); - - // Get all broadcasted tables - Set<String> broadcastTableNames = new HashSet<String>(); - List<EnforceProperty> broadcasts = context.getEnforcer().getEnforceProperties(EnforceType.BROADCAST); - if (broadcasts != null) { - for (EnforceProperty eachBroadcast : broadcasts) { - broadcastTableNames.add(eachBroadcast.getBroadcast().getTableName()); - } - } - - // localize the fetched data and skip the broadcast table - for (String inputTable: inputs) { - if (broadcastTableNames.contains(inputTable)) { - continue; - } - File tableDir = new File(context.getFetchIn(), inputTable); - FileFragment[] frags = localizeFetchedData(tableDir, inputTable, descs.get(inputTable).getMeta()); - context.updateAssignedFragments(inputTable, frags); - } - } - - public void run() throws Exception { - startTime = System.currentTimeMillis(); - Throwable error = null; - try { - if(!context.isStopped()) { - context.setState(TaskAttemptState.TA_RUNNING); - if (context.hasFetchPhase()) { - // If the fetch is still in progress, the query unit must wait for - // complete. - waitForFetch(); - context.setFetcherProgress(FETCHER_PROGRESS); - context.setProgressChanged(true); - updateProgress(); - } - - this.executor = executionBlockContext.getTQueryEngine(). - createPlan(context, plan); - this.executor.init(); - - while(!context.isStopped() && executor.next() != null) { - } - } - } catch (Throwable e) { - error = e ; - LOG.error(e.getMessage(), e); - stopScriptExecutors(); - context.stop(); - } finally { - if (executor != null) { - try { - executor.close(); - reloadInputStats(); - } catch (IOException e) { - LOG.error(e, e); - } - this.executor = null; - } - - executionBlockContext.completedTasksNum.incrementAndGet(); - context.getHashShuffleAppenderManager().finalizeTask(taskId); - - QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = executionBlockContext.getStub(); - if (context.isStopped()) { - context.setExecutorProgress(0.0f); - - if (context.getState() == TaskAttemptState.TA_KILLED) { - queryMasterStub.statusUpdate(null, getReport(), NullCallback.get()); - executionBlockContext.killedTasksNum.incrementAndGet(); - } else { - context.setState(TaskAttemptState.TA_FAILED); - TaskFatalErrorReport.Builder errorBuilder = - TaskFatalErrorReport.newBuilder() - .setId(getId().getProto()); - if (error != null) { - if (error.getMessage() == null) { - errorBuilder.setErrorMessage(error.getClass().getCanonicalName()); - } else { - errorBuilder.setErrorMessage(error.getMessage()); - } - errorBuilder.setErrorTrace(ExceptionUtils.getStackTrace(error)); - } - - queryMasterStub.fatalError(null, errorBuilder.build(), NullCallback.get()); - executionBlockContext.failedTasksNum.incrementAndGet(); - } - } else { - // if successful - context.setProgress(1.0f); - context.setState(TaskAttemptState.TA_SUCCEEDED); - executionBlockContext.succeededTasksNum.incrementAndGet(); - - TaskCompletionReport report = getTaskCompletionReport(); - queryMasterStub.done(null, report, NullCallback.get()); - } - finishTime = System.currentTimeMillis(); - LOG.info(context.getTaskId() + " completed. " + - "Worker's task counter - total:" + executionBlockContext.completedTasksNum.intValue() + - ", succeeded: " + executionBlockContext.succeededTasksNum.intValue() - + ", killed: " + executionBlockContext.killedTasksNum.intValue() - + ", failed: " + executionBlockContext.failedTasksNum.intValue()); - cleanupTask(); - } - } - - public void cleanupTask() { - TaskHistory taskHistory = createTaskHistory(); - executionBlockContext.addTaskHistory(taskRunnerId, getId(), taskHistory); - executionBlockContext.getTasks().remove(getId()); - - fetcherRunners.clear(); - fetcherRunners = null; - try { - if(executor != null) { - executor.close(); - executor = null; - } - } catch (IOException e) { - LOG.fatal(e.getMessage(), e); - } - - executionBlockContext.getWorkerContext().getTaskHistoryWriter().appendHistory(taskHistory); - stopScriptExecutors(); - } - - public TaskHistory createTaskHistory() { - TaskHistory taskHistory = null; - try { - taskHistory = new TaskHistory(getTaskId(), getStatus(), context.getProgress(), - startTime, finishTime, reloadInputStats()); - - if (context.getOutputPath() != null) { - taskHistory.setOutputPath(context.getOutputPath().toString()); - } - - if (context.getWorkDir() != null) { - taskHistory.setWorkingPath(context.getWorkDir().toString()); - } - - if (context.getResultStats() != null) { - taskHistory.setOutputStats(context.getResultStats().getProto()); - } - - if (hasFetchPhase()) { - taskHistory.setTotalFetchCount(fetcherRunners.size()); - int i = 0; - FetcherHistoryProto.Builder builder = FetcherHistoryProto.newBuilder(); - for (Fetcher fetcher : fetcherRunners) { - // TODO store the fetcher histories - if (systemConf.getBoolVar(TajoConf.ConfVars.$DEBUG_ENABLED)) { - builder.setStartTime(fetcher.getStartTime()); - builder.setFinishTime(fetcher.getFinishTime()); - builder.setFileLength(fetcher.getFileLen()); - builder.setMessageReceivedCount(fetcher.getMessageReceiveCount()); - builder.setState(fetcher.getState()); - - taskHistory.addFetcherHistory(builder.build()); - } - if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED) i++; - } - taskHistory.setFinishedFetchCount(i); - } - } catch (Exception e) { - LOG.warn(e.getMessage(), e); - } - - return taskHistory; - } - - public int hashCode() { - return context.hashCode(); - } - - public boolean equals(Object obj) { - if (obj instanceof Task) { - Task other = (Task) obj; - return this.context.equals(other.context); - } - return false; - } - - private FileFragment[] localizeFetchedData(File file, String name, TableMeta meta) - throws IOException { - Configuration c = new Configuration(systemConf); - c.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "file:///"); - FileSystem fs = FileSystem.get(c); - Path tablePath = new Path(file.getAbsolutePath()); - - List<FileFragment> listTablets = new ArrayList<FileFragment>(); - FileFragment tablet; - - FileStatus[] fileLists = fs.listStatus(tablePath); - for (FileStatus f : fileLists) { - if (f.getLen() == 0) { - continue; - } - tablet = new FileFragment(name, f.getPath(), 0l, f.getLen()); - listTablets.add(tablet); - } - - // Special treatment for locally pseudo fetched chunks - synchronized (localChunks) { - for (FileChunk chunk : localChunks) { - if (name.equals(chunk.getEbId())) { - tablet = new FileFragment(name, new Path(chunk.getFile().getPath()), chunk.startOffset(), chunk.length()); - listTablets.add(tablet); - LOG.info("One local chunk is added to listTablets"); - } - } - } - - FileFragment[] tablets = new FileFragment[listTablets.size()]; - listTablets.toArray(tablets); - - return tablets; - } - - private class FetchRunner implements Runnable { - private final TaskAttemptContext ctx; - private final Fetcher fetcher; - private int maxRetryNum; - - public FetchRunner(TaskAttemptContext ctx, Fetcher fetcher) { - this.ctx = ctx; - this.fetcher = fetcher; - this.maxRetryNum = systemConf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_RETRY_MAX_NUM); - } - - @Override - public void run() { - int retryNum = 0; - int retryWaitTime = 1000; //sec - - try { // for releasing fetch latch - while(!context.isStopped() && retryNum < maxRetryNum) { - if (retryNum > 0) { - try { - Thread.sleep(retryWaitTime); - retryWaitTime = Math.min(10 * 1000, retryWaitTime * 2); // max 10 seconds - } catch (InterruptedException e) { - LOG.error(e); - } - LOG.warn("Retry on the fetch: " + fetcher.getURI() + " (" + retryNum + ")"); - } - try { - FileChunk fetched = fetcher.get(); - if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED && fetched != null - && fetched.getFile() != null) { - if (fetched.fromRemote() == false) { - localChunks.add(fetched); - LOG.info("Add a new FileChunk to local chunk list"); - } - break; - } - } catch (Throwable e) { - LOG.error("Fetch failed: " + fetcher.getURI(), e); - } - retryNum++; - } - } finally { - if(fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED){ - fetcherFinished(ctx); - } else { - if (retryNum == maxRetryNum) { - LOG.error("ERROR: the maximum retry (" + retryNum + ") on the fetch exceeded (" + fetcher.getURI() + ")"); - } - stopScriptExecutors(); - context.stop(); // retry task - ctx.getFetchLatch().countDown(); - } - } - } - } - - @VisibleForTesting - public static float adjustFetchProcess(int totalFetcher, int remainFetcher) { - if (totalFetcher > 0) { - return ((totalFetcher - remainFetcher) / (float) totalFetcher) * FETCHER_PROGRESS; - } else { - return 0.0f; - } - } - - private synchronized void fetcherFinished(TaskAttemptContext ctx) { - int fetcherSize = fetcherRunners.size(); - if(fetcherSize == 0) { - return; - } - - ctx.getFetchLatch().countDown(); - - int remainFetcher = (int) ctx.getFetchLatch().getCount(); - if (remainFetcher == 0) { - context.setFetcherProgress(FETCHER_PROGRESS); - } else { - context.setFetcherProgress(adjustFetchProcess(fetcherSize, remainFetcher)); - context.setProgressChanged(true); - } - } - - private List<Fetcher> getFetchRunners(TaskAttemptContext ctx, - List<FetchImpl> fetches) throws IOException { - - if (fetches.size() > 0) { - Path inputDir = executionBlockContext.getLocalDirAllocator(). - getLocalPathToRead(getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf); - - int i = 0; - File storeDir; - File defaultStoreFile; - FileChunk storeChunk = null; - List<Fetcher> runnerList = Lists.newArrayList(); - - for (FetchImpl f : fetches) { - storeDir = new File(inputDir.toString(), f.getName()); - if (!storeDir.exists()) { - storeDir.mkdirs(); - } - - for (URI uri : f.getURIs()) { - defaultStoreFile = new File(storeDir, "in_" + i); - InetAddress address = InetAddress.getByName(uri.getHost()); - - WorkerConnectionInfo conn = executionBlockContext.getWorkerContext().getConnectionInfo(); - if (NetUtils.isLocalAddress(address) && conn.getPullServerPort() == uri.getPort()) { - boolean hasError = false; - try { - LOG.info("Try to get local file chunk at local host"); - storeChunk = getLocalStoredFileChunk(uri, systemConf); - } catch (Throwable t) { - hasError = true; - } - - // When a range request is out of range, storeChunk will be NULL. This case is normal state. - // So, we should skip and don't need to create storeChunk. - if (storeChunk == null && !hasError) { - continue; - } - - if (storeChunk != null && storeChunk.getFile() != null && storeChunk.startOffset() > -1 - && hasError == false) { - storeChunk.setFromRemote(false); - } else { - storeChunk = new FileChunk(defaultStoreFile, 0, -1); - storeChunk.setFromRemote(true); - } - } else { - storeChunk = new FileChunk(defaultStoreFile, 0, -1); - storeChunk.setFromRemote(true); - } - - // If we decide that intermediate data should be really fetched from a remote host, storeChunk - // represents a complete file. Otherwise, storeChunk may represent a complete file or only a part of it - storeChunk.setEbId(f.getName()); - Fetcher fetcher = new Fetcher(systemConf, uri, storeChunk); - LOG.info("Create a new Fetcher with storeChunk:" + storeChunk.toString()); - runnerList.add(fetcher); - i++; - } - } - ctx.addFetchPhase(runnerList.size(), new File(inputDir.toString())); - return runnerList; - } else { - return Lists.newArrayList(); - } - } - - private FileChunk getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IOException { - // Parse the URI - LOG.info("getLocalStoredFileChunk starts"); - final Map<String, List<String>> params = new QueryStringDecoder(fetchURI.toString()).parameters(); - final List<String> types = params.get("type"); - final List<String> qids = params.get("qid"); - final List<String> taskIdList = params.get("ta"); - final List<String> stageIds = params.get("sid"); - final List<String> partIds = params.get("p"); - final List<String> offsetList = params.get("offset"); - final List<String> lengthList = params.get("length"); - - if (types == null || stageIds == null || qids == null || partIds == null) { - LOG.error("Invalid URI - Required queryId, type, stage Id, and part id"); - return null; - } - - if (qids.size() != 1 && types.size() != 1 || stageIds.size() != 1) { - LOG.error("Invalid URI - Required qids, type, taskIds, stage Id, and part id"); - return null; - } - - String queryId = qids.get(0); - String shuffleType = types.get(0); - String sid = stageIds.get(0); - String partId = partIds.get(0); - - if (shuffleType.equals("r") && taskIdList == null) { - LOG.error("Invalid URI - For range shuffle, taskId is required"); - return null; - } - List<String> taskIds = splitMaps(taskIdList); - - FileChunk chunk = null; - long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L; - long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L; - - LOG.info("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId - + ", taskIds=" + taskIdList); - - // The working directory of Tajo worker for each query, including stage - String queryBaseDir = queryId.toString() + "/output" + "/" + sid + "/"; - - // If the stage requires a range shuffle - if (shuffleType.equals("r")) { - String ta = taskIds.get(0); - if (!executionBlockContext.getLocalDirAllocator().ifExists(queryBaseDir + ta + "/output/", conf)) { - LOG.warn("Range shuffle - file not exist"); - return null; - } - Path path = executionBlockContext.getLocalFS().makeQualified( - executionBlockContext.getLocalDirAllocator().getLocalPathToRead(queryBaseDir + ta + "/output/", conf)); - String startKey = params.get("start").get(0); - String endKey = params.get("end").get(0); - boolean last = params.get("final") != null; - - try { - chunk = TajoPullServerService.getFileChunks(path, startKey, endKey, last); - } catch (Throwable t) { - LOG.error("getFileChunks() throws exception"); - return null; - } - - // If the stage requires a hash shuffle or a scattered hash shuffle - } else if (shuffleType.equals("h") || shuffleType.equals("s")) { - int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), (TajoConf) conf); - String partPath = queryBaseDir + "hash-shuffle/" + partParentId + "/" + partId; - if (!executionBlockContext.getLocalDirAllocator().ifExists(partPath, conf)) { - LOG.warn("Hash shuffle or Scattered hash shuffle - file not exist: " + partPath); - return null; - } - Path path = executionBlockContext.getLocalFS().makeQualified( - executionBlockContext.getLocalDirAllocator().getLocalPathToRead(partPath, conf)); - File file = new File(path.toUri()); - long startPos = (offset >= 0 && length >= 0) ? offset : 0; - long readLen = (offset >= 0 && length >= 0) ? length : file.length(); - - if (startPos >= file.length()) { - LOG.error("Start pos[" + startPos + "] great than file length [" + file.length() + "]"); - return null; - } - chunk = new FileChunk(file, startPos, readLen); - - } else { - LOG.error("Unknown shuffle type"); - return null; - } - - return chunk; - } - - private List<String> splitMaps(List<String> mapq) { - if (null == mapq) { - return null; - } - final List<String> ret = new ArrayList<String>(); - for (String s : mapq) { - Collections.addAll(ret, s.split(",")); - } - return ret; - } - - public static Path getTaskAttemptDir(TaskAttemptId quid) { - Path workDir = - StorageUtil.concatPath(ExecutionBlockContext.getBaseInputDir(quid.getTaskId().getExecutionBlockId()), - String.valueOf(quid.getTaskId().getId()), - String.valueOf(quid.getId())); - return workDir; - } + public interface Task { + + void init() throws IOException; + + void fetch(); + + void run() throws Exception; + + void kill(); + + void abort(); + + void cleanup(); + + boolean hasFetchPhase(); + + boolean isProgressChanged(); + + boolean isStopped(); + + void updateProgress(); + + TaskAttemptContext getTaskContext(); + + ExecutionBlockContext getExecutionBlockContext(); + ++//<<<<<<< HEAD ++// ++// public TaskAttemptId getTaskId() { ++// return taskId; ++// } ++// ++// public TaskAttemptId getId() { ++// return context.getTaskId(); ++// } ++// ++// public TaskAttemptState getStatus() { ++// return context.getState(); ++// } ++// ++// public String toString() { ++// return "queryId: " + this.getId() + " status: " + this.getStatus(); ++// } ++// ++// public void setState(TaskAttemptState status) { ++// context.setState(status); ++// } ++// ++// public TaskAttemptContext getContext() { ++// return context; ++// } ++// ++// public boolean hasFetchPhase() { ++// return fetcherRunners.size() > 0; ++// } ++// ++// public List<Fetcher> getFetchers() { ++// return new ArrayList<Fetcher>(fetcherRunners); ++// } ++// ++// public void fetch() { ++// ExecutorService executorService = executionBlockContext.getTaskRunner(taskRunnerId).getFetchLauncher(); ++// for (Fetcher f : fetcherRunners) { ++// executorService.submit(new FetchRunner(context, f)); ++// } ++// } ++// ++// public void kill() { ++// stopScriptExecutors(); ++// context.setState(TaskAttemptState.TA_KILLED); ++// context.stop(); ++// } ++// ++// public void abort() { ++// stopScriptExecutors(); ++// context.stop(); ++// } ++// ++// public void cleanUp() { ++// // remove itself from worker ++// if (context.getState() == TaskAttemptState.TA_SUCCEEDED) { ++// synchronized (executionBlockContext.getTasks()) { ++// executionBlockContext.getTasks().remove(this.getId()); ++// } ++// } else { ++// LOG.error("TaskAttemptId: " + context.getTaskId() + " status: " + context.getState()); ++// } ++// } ++// ++// public TaskStatusProto getReport() { ++// TaskStatusProto.Builder builder = TaskStatusProto.newBuilder(); ++// builder.setWorkerName(executionBlockContext.getWorkerContext().getConnectionInfo().getHostAndPeerRpcPort()); ++// builder.setId(context.getTaskId().getProto()) ++// .setProgress(context.getProgress()) ++// .setState(context.getState()); ++// ++// builder.setInputStats(reloadInputStats()); ++// ++// if (context.getResultStats() != null) { ++// builder.setResultStats(context.getResultStats().getProto()); ++// } ++// return builder.build(); ++// } ++// ++// public boolean isRunning(){ ++// return context.getState() == TaskAttemptState.TA_RUNNING; ++// } ++// ++// public boolean isProgressChanged() { ++// return context.isProgressChanged(); ++// } ++// ++// public void updateProgress() { ++// if(context != null && context.isStopped()){ ++// return; ++// } ++// ++// if (executor != null && context.getProgress() < 1.0f) { ++// context.setExecutorProgress(executor.getProgress()); ++// } ++// } ++// ++// private CatalogProtos.TableStatsProto reloadInputStats() { ++// synchronized(inputStats) { ++// if (this.executor == null) { ++// return inputStats.getProto(); ++// } ++// ++// TableStats executorInputStats = this.executor.getInputStats(); ++// ++// if (executorInputStats != null) { ++// inputStats.setValues(executorInputStats); ++// } ++// return inputStats.getProto(); ++// } ++// } ++// ++// private TaskCompletionReport getTaskCompletionReport() { ++// TaskCompletionReport.Builder builder = TaskCompletionReport.newBuilder(); ++// builder.setId(context.getTaskId().getProto()); ++// ++// builder.setInputStats(reloadInputStats()); ++// ++// if (context.hasResultStats()) { ++// builder.setResultStats(context.getResultStats().getProto()); ++// } else { ++// builder.setResultStats(new TableStats().getProto()); ++// } ++// ++// Iterator<Entry<Integer, String>> it = context.getShuffleFileOutputs(); ++// if (it.hasNext()) { ++// do { ++// Entry<Integer, String> entry = it.next(); ++// ShuffleFileOutput.Builder part = ShuffleFileOutput.newBuilder(); ++// part.setPartId(entry.getKey()); ++// ++// // Set output volume ++// if (context.getPartitionOutputVolume() != null) { ++// for (Entry<Integer, Long> e : context.getPartitionOutputVolume().entrySet()) { ++// if (entry.getKey().equals(e.getKey())) { ++// part.setVolume(e.getValue().longValue()); ++// break; ++// } ++// } ++// } ++// ++// builder.addShuffleFileOutputs(part.build()); ++// } while (it.hasNext()); ++// } ++// ++// return builder.build(); ++// } ++// ++// private void waitForFetch() throws InterruptedException, IOException { ++// context.getFetchLatch().await(); ++// LOG.info(context.getTaskId() + " All fetches are done!"); ++// Collection<String> inputs = Lists.newArrayList(context.getInputTables()); ++// ++// // Get all broadcasted tables ++// Set<String> broadcastTableNames = new HashSet<String>(); ++// List<EnforceProperty> broadcasts = context.getEnforcer().getEnforceProperties(EnforceType.BROADCAST); ++// if (broadcasts != null) { ++// for (EnforceProperty eachBroadcast : broadcasts) { ++// broadcastTableNames.add(eachBroadcast.getBroadcast().getTableName()); ++// } ++// } ++// ++// // localize the fetched data and skip the broadcast table ++// for (String inputTable: inputs) { ++// if (broadcastTableNames.contains(inputTable)) { ++// continue; ++// } ++// File tableDir = new File(context.getFetchIn(), inputTable); ++// FileFragment[] frags = localizeFetchedData(tableDir, inputTable, descs.get(inputTable).getMeta()); ++// context.updateAssignedFragments(inputTable, frags); ++// } ++// } ++// ++// public void run() throws Exception { ++// startTime = System.currentTimeMillis(); ++// Throwable error = null; ++// try { ++// if(!context.isStopped()) { ++// context.setState(TaskAttemptState.TA_RUNNING); ++// if (context.hasFetchPhase()) { ++// // If the fetch is still in progress, the query unit must wait for ++// // complete. ++// waitForFetch(); ++// context.setFetcherProgress(FETCHER_PROGRESS); ++// context.setProgressChanged(true); ++// updateProgress(); ++// } ++// ++// this.executor = executionBlockContext.getTQueryEngine(). ++// createPlan(context, plan); ++// this.executor.init(); ++// ++// while(!context.isStopped() && executor.next() != null) { ++// } ++// } ++// } catch (Throwable e) { ++// error = e ; ++// LOG.error(e.getMessage(), e); ++// stopScriptExecutors(); ++// context.stop(); ++// } finally { ++// if (executor != null) { ++// try { ++// executor.close(); ++// reloadInputStats(); ++// } catch (IOException e) { ++// LOG.error(e, e); ++// } ++// this.executor = null; ++// } ++// ++// executionBlockContext.completedTasksNum.incrementAndGet(); ++// context.getHashShuffleAppenderManager().finalizeTask(taskId); ++// ++// QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = executionBlockContext.getStub(); ++// if (context.isStopped()) { ++// context.setExecutorProgress(0.0f); ++// ++// if (context.getState() == TaskAttemptState.TA_KILLED) { ++// queryMasterStub.statusUpdate(null, getReport(), NullCallback.get()); ++// executionBlockContext.killedTasksNum.incrementAndGet(); ++// } else { ++// context.setState(TaskAttemptState.TA_FAILED); ++// TaskFatalErrorReport.Builder errorBuilder = ++// TaskFatalErrorReport.newBuilder() ++// .setId(getId().getProto()); ++// if (error != null) { ++// if (error.getMessage() == null) { ++// errorBuilder.setErrorMessage(error.getClass().getCanonicalName()); ++// } else { ++// errorBuilder.setErrorMessage(error.getMessage()); ++// } ++// errorBuilder.setErrorTrace(ExceptionUtils.getStackTrace(error)); ++// } ++// ++// queryMasterStub.fatalError(null, errorBuilder.build(), NullCallback.get()); ++// executionBlockContext.failedTasksNum.incrementAndGet(); ++// } ++// } else { ++// // if successful ++// context.setProgress(1.0f); ++// context.setState(TaskAttemptState.TA_SUCCEEDED); ++// executionBlockContext.succeededTasksNum.incrementAndGet(); ++// ++// TaskCompletionReport report = getTaskCompletionReport(); ++// queryMasterStub.done(null, report, NullCallback.get()); ++// } ++// finishTime = System.currentTimeMillis(); ++// LOG.info(context.getTaskId() + " completed. " + ++// "Worker's task counter - total:" + executionBlockContext.completedTasksNum.intValue() + ++// ", succeeded: " + executionBlockContext.succeededTasksNum.intValue() ++// + ", killed: " + executionBlockContext.killedTasksNum.intValue() ++// + ", failed: " + executionBlockContext.failedTasksNum.intValue()); ++// cleanupTask(); ++// } ++// } ++// ++// public void cleanupTask() { ++// TaskHistory taskHistory = createTaskHistory(); ++// executionBlockContext.addTaskHistory(taskRunnerId, getId(), taskHistory); ++// executionBlockContext.getTasks().remove(getId()); ++// ++// fetcherRunners.clear(); ++// fetcherRunners = null; ++// try { ++// if(executor != null) { ++// executor.close(); ++// executor = null; ++// } ++// } catch (IOException e) { ++// LOG.fatal(e.getMessage(), e); ++// } ++// ++// executionBlockContext.getWorkerContext().getTaskHistoryWriter().appendHistory(taskHistory); ++// stopScriptExecutors(); ++// } ++// ++// public TaskHistory createTaskHistory() { ++// TaskHistory taskHistory = null; ++// try { ++// taskHistory = new TaskHistory(getTaskId(), getStatus(), context.getProgress(), ++// startTime, finishTime, reloadInputStats()); ++// ++// if (context.getOutputPath() != null) { ++// taskHistory.setOutputPath(context.getOutputPath().toString()); ++// } ++// ++// if (context.getWorkDir() != null) { ++// taskHistory.setWorkingPath(context.getWorkDir().toString()); ++// } ++// ++// if (context.getResultStats() != null) { ++// taskHistory.setOutputStats(context.getResultStats().getProto()); ++// } ++// ++// if (hasFetchPhase()) { ++// taskHistory.setTotalFetchCount(fetcherRunners.size()); ++// int i = 0; ++// FetcherHistoryProto.Builder builder = FetcherHistoryProto.newBuilder(); ++// for (Fetcher fetcher : fetcherRunners) { ++// // TODO store the fetcher histories ++// if (systemConf.getBoolVar(TajoConf.ConfVars.$DEBUG_ENABLED)) { ++// builder.setStartTime(fetcher.getStartTime()); ++// builder.setFinishTime(fetcher.getFinishTime()); ++// builder.setFileLength(fetcher.getFileLen()); ++// builder.setMessageReceivedCount(fetcher.getMessageReceiveCount()); ++// builder.setState(fetcher.getState()); ++// ++// taskHistory.addFetcherHistory(builder.build()); ++// } ++// if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED) i++; ++// } ++// taskHistory.setFinishedFetchCount(i); ++// } ++// } catch (Exception e) { ++// LOG.warn(e.getMessage(), e); ++// } ++// ++// return taskHistory; ++// } ++// ++// public int hashCode() { ++// return context.hashCode(); ++// } ++// ++// public boolean equals(Object obj) { ++// if (obj instanceof Task) { ++// Task other = (Task) obj; ++// return this.context.equals(other.context); ++// } ++// return false; ++// } ++// ++// private FileFragment[] localizeFetchedData(File file, String name, TableMeta meta) ++// throws IOException { ++// Configuration c = new Configuration(systemConf); ++// c.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "file:///"); ++// FileSystem fs = FileSystem.get(c); ++// Path tablePath = new Path(file.getAbsolutePath()); ++// ++// List<FileFragment> listTablets = new ArrayList<FileFragment>(); ++// FileFragment tablet; ++// ++// FileStatus[] fileLists = fs.listStatus(tablePath); ++// for (FileStatus f : fileLists) { ++// if (f.getLen() == 0) { ++// continue; ++// } ++// tablet = new FileFragment(name, f.getPath(), 0l, f.getLen()); ++// listTablets.add(tablet); ++// } ++// ++// // Special treatment for locally pseudo fetched chunks ++// synchronized (localChunks) { ++// for (FileChunk chunk : localChunks) { ++// if (name.equals(chunk.getEbId())) { ++// tablet = new FileFragment(name, new Path(chunk.getFile().getPath()), chunk.startOffset(), chunk.length()); ++// listTablets.add(tablet); ++// LOG.info("One local chunk is added to listTablets"); ++// } ++// } ++// } ++// ++// FileFragment[] tablets = new FileFragment[listTablets.size()]; ++// listTablets.toArray(tablets); ++// ++// return tablets; ++// } ++// ++// private class FetchRunner implements Runnable { ++// private final TaskAttemptContext ctx; ++// private final Fetcher fetcher; ++// private int maxRetryNum; ++// ++// public FetchRunner(TaskAttemptContext ctx, Fetcher fetcher) { ++// this.ctx = ctx; ++// this.fetcher = fetcher; ++// this.maxRetryNum = systemConf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_RETRY_MAX_NUM); ++// } ++// ++// @Override ++// public void run() { ++// int retryNum = 0; ++// int retryWaitTime = 1000; //sec ++// ++// try { // for releasing fetch latch ++// while(!context.isStopped() && retryNum < maxRetryNum) { ++// if (retryNum > 0) { ++// try { ++// Thread.sleep(retryWaitTime); ++// retryWaitTime = Math.min(10 * 1000, retryWaitTime * 2); // max 10 seconds ++// } catch (InterruptedException e) { ++// LOG.error(e); ++// } ++// LOG.warn("Retry on the fetch: " + fetcher.getURI() + " (" + retryNum + ")"); ++// } ++// try { ++// FileChunk fetched = fetcher.get(); ++// if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED && fetched != null ++// && fetched.getFile() != null) { ++// if (fetched.fromRemote() == false) { ++// localChunks.add(fetched); ++// LOG.info("Add a new FileChunk to local chunk list"); ++// } ++// break; ++// } ++// } catch (Throwable e) { ++// LOG.error("Fetch failed: " + fetcher.getURI(), e); ++// } ++// retryNum++; ++// } ++// } finally { ++// if(fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED){ ++// fetcherFinished(ctx); ++// } else { ++// if (retryNum == maxRetryNum) { ++// LOG.error("ERROR: the maximum retry (" + retryNum + ") on the fetch exceeded (" + fetcher.getURI() + ")"); ++// } ++// stopScriptExecutors(); ++// context.stop(); // retry task ++// ctx.getFetchLatch().countDown(); ++// } ++// } ++// } ++// } ++// ++// @VisibleForTesting ++// public static float adjustFetchProcess(int totalFetcher, int remainFetcher) { ++// if (totalFetcher > 0) { ++// return ((totalFetcher - remainFetcher) / (float) totalFetcher) * FETCHER_PROGRESS; ++// } else { ++// return 0.0f; ++// } ++// } ++// ++// private synchronized void fetcherFinished(TaskAttemptContext ctx) { ++// int fetcherSize = fetcherRunners.size(); ++// if(fetcherSize == 0) { ++// return; ++// } ++// ++// ctx.getFetchLatch().countDown(); ++// ++// int remainFetcher = (int) ctx.getFetchLatch().getCount(); ++// if (remainFetcher == 0) { ++// context.setFetcherProgress(FETCHER_PROGRESS); ++// } else { ++// context.setFetcherProgress(adjustFetchProcess(fetcherSize, remainFetcher)); ++// context.setProgressChanged(true); ++// } ++// } ++// ++// private List<Fetcher> getFetchRunners(TaskAttemptContext ctx, ++// List<FetchImpl> fetches) throws IOException { ++// ++// if (fetches.size() > 0) { ++// Path inputDir = executionBlockContext.getLocalDirAllocator(). ++// getLocalPathToRead(getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf); ++// ++// int i = 0; ++// File storeDir; ++// File defaultStoreFile; ++// FileChunk storeChunk = null; ++// List<Fetcher> runnerList = Lists.newArrayList(); ++// ++// for (FetchImpl f : fetches) { ++// storeDir = new File(inputDir.toString(), f.getName()); ++// if (!storeDir.exists()) { ++// storeDir.mkdirs(); ++// } ++// ++// for (URI uri : f.getURIs()) { ++// defaultStoreFile = new File(storeDir, "in_" + i); ++// InetAddress address = InetAddress.getByName(uri.getHost()); ++// ++// WorkerConnectionInfo conn = executionBlockContext.getWorkerContext().getConnectionInfo(); ++// if (NetUtils.isLocalAddress(address) && conn.getPullServerPort() == uri.getPort()) { ++// boolean hasError = false; ++// try { ++// LOG.info("Try to get local file chunk at local host"); ++// storeChunk = getLocalStoredFileChunk(uri, systemConf); ++// } catch (Throwable t) { ++// hasError = true; ++// } ++// ++// // When a range request is out of range, storeChunk will be NULL. This case is normal state. ++// // So, we should skip and don't need to create storeChunk. ++// if (storeChunk == null && !hasError) { ++// continue; ++// } ++// ++// if (storeChunk != null && storeChunk.getFile() != null && storeChunk.startOffset() > -1 ++// && hasError == false) { ++// storeChunk.setFromRemote(false); ++// } else { ++// storeChunk = new FileChunk(defaultStoreFile, 0, -1); ++// storeChunk.setFromRemote(true); ++// } ++// } else { ++// storeChunk = new FileChunk(defaultStoreFile, 0, -1); ++// storeChunk.setFromRemote(true); ++// } ++// ++// // If we decide that intermediate data should be really fetched from a remote host, storeChunk ++// // represents a complete file. Otherwise, storeChunk may represent a complete file or only a part of it ++// storeChunk.setEbId(f.getName()); ++// Fetcher fetcher = new Fetcher(systemConf, uri, storeChunk); ++// LOG.info("Create a new Fetcher with storeChunk:" + storeChunk.toString()); ++// runnerList.add(fetcher); ++// i++; ++// } ++// } ++// ctx.addFetchPhase(runnerList.size(), new File(inputDir.toString())); ++// return runnerList; ++// } else { ++// return Lists.newArrayList(); ++// } ++// } ++// ++// private FileChunk getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IOException { ++// // Parse the URI ++// LOG.info("getLocalStoredFileChunk starts"); ++// final Map<String, List<String>> params = new QueryStringDecoder(fetchURI.toString()).parameters(); ++// final List<String> types = params.get("type"); ++// final List<String> qids = params.get("qid"); ++// final List<String> taskIdList = params.get("ta"); ++// final List<String> stageIds = params.get("sid"); ++// final List<String> partIds = params.get("p"); ++// final List<String> offsetList = params.get("offset"); ++// final List<String> lengthList = params.get("length"); ++// ++// if (types == null || stageIds == null || qids == null || partIds == null) { ++// LOG.error("Invalid URI - Required queryId, type, stage Id, and part id"); ++// return null; ++// } ++// ++// if (qids.size() != 1 && types.size() != 1 || stageIds.size() != 1) { ++// LOG.error("Invalid URI - Required qids, type, taskIds, stage Id, and part id"); ++// return null; ++// } ++// ++// String queryId = qids.get(0); ++// String shuffleType = types.get(0); ++// String sid = stageIds.get(0); ++// String partId = partIds.get(0); ++// ++// if (shuffleType.equals("r") && taskIdList == null) { ++// LOG.error("Invalid URI - For range shuffle, taskId is required"); ++// return null; ++// } ++// List<String> taskIds = splitMaps(taskIdList); ++// ++// FileChunk chunk = null; ++// long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L; ++// long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L; ++// ++// LOG.info("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId ++// + ", taskIds=" + taskIdList); ++// ++// // The working directory of Tajo worker for each query, including stage ++// String queryBaseDir = queryId.toString() + "/output" + "/" + sid + "/"; ++// ++// // If the stage requires a range shuffle ++// if (shuffleType.equals("r")) { ++// String ta = taskIds.get(0); ++// if (!executionBlockContext.getLocalDirAllocator().ifExists(queryBaseDir + ta + "/output/", conf)) { ++// LOG.warn("Range shuffle - file not exist"); ++// return null; ++// } ++// Path path = executionBlockContext.getLocalFS().makeQualified( ++// executionBlockContext.getLocalDirAllocator().getLocalPathToRead(queryBaseDir + ta + "/output/", conf)); ++// String startKey = params.get("start").get(0); ++// String endKey = params.get("end").get(0); ++// boolean last = params.get("final") != null; ++// ++// try { ++// chunk = TajoPullServerService.getFileChunks(path, startKey, endKey, last); ++// } catch (Throwable t) { ++// LOG.error("getFileChunks() throws exception"); ++// return null; ++// } ++// ++// // If the stage requires a hash shuffle or a scattered hash shuffle ++// } else if (shuffleType.equals("h") || shuffleType.equals("s")) { ++// int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), (TajoConf) conf); ++// String partPath = queryBaseDir + "hash-shuffle/" + partParentId + "/" + partId; ++// if (!executionBlockContext.getLocalDirAllocator().ifExists(partPath, conf)) { ++// LOG.warn("Hash shuffle or Scattered hash shuffle - file not exist: " + partPath); ++// return null; ++// } ++// Path path = executionBlockContext.getLocalFS().makeQualified( ++// executionBlockContext.getLocalDirAllocator().getLocalPathToRead(partPath, conf)); ++// File file = new File(path.toUri()); ++// long startPos = (offset >= 0 && length >= 0) ? offset : 0; ++// long readLen = (offset >= 0 && length >= 0) ? length : file.length(); ++// ++// if (startPos >= file.length()) { ++// LOG.error("Start pos[" + startPos + "] great than file length [" + file.length() + "]"); ++// return null; ++// } ++// chunk = new FileChunk(file, startPos, readLen); ++// ++// } else { ++// LOG.error("Unknown shuffle type"); ++// return null; ++// } ++// ++// return chunk; ++// } ++// ++// private List<String> splitMaps(List<String> mapq) { ++// if (null == mapq) { ++// return null; ++// } ++// final List<String> ret = new ArrayList<String>(); ++// for (String s : mapq) { ++// Collections.addAll(ret, s.split(",")); ++// } ++// return ret; ++// } ++// ++// public static Path getTaskAttemptDir(TaskAttemptId quid) { ++// Path workDir = ++// StorageUtil.concatPath(ExecutionBlockContext.getBaseInputDir(quid.getTaskId().getExecutionBlockId()), ++// String.valueOf(quid.getTaskId().getId()), ++// String.valueOf(quid.getId())); ++// return workDir; ++// } ++//======= + TajoWorkerProtocol.TaskStatusProto getReport(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/f674fa8f/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java ---------------------------------------------------------------------- diff --cc tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java index 5a62487,d020639..5d7a53a --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java @@@ -381,20 -367,9 +367,20 @@@ public class TaskAttemptContext } return fragmentMap.get(id).toArray(new FragmentProto[fragmentMap.get(id).size()]); } - + + public String getUniqueKeyFromFragments() { + StringBuilder sb = new StringBuilder(); + for (List<FragmentProto> fragments : fragmentMap.values()) { + for (FragmentProto f : fragments) { + FileFragment fileFragment = FragmentConvertor.convert(FileFragment.class, f); + sb.append(fileFragment.getPath().getName()).append(fileFragment.getStartKey()).append(fileFragment.getLength()); + } + } + return sb.toString(); + } + public int hashCode() { - return Objects.hashCode(queryId); + return Objects.hashCode(taskId); } public boolean equals(Object obj) {
