Repository: tajo Updated Branches: refs/heads/master 0024c75e9 -> 17c6dff4e
TAJO-1309: Add missing break point in physical operator. (jinho) Closes #355 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/17c6dff4 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/17c6dff4 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/17c6dff4 Branch: refs/heads/master Commit: 17c6dff4e258d93e0ffaa1cc07368b2b5d8b8aa4 Parents: 0024c75 Author: jhkim <[email protected]> Authored: Fri Jan 23 10:24:59 2015 +0900 Committer: jhkim <[email protected]> Committed: Fri Jan 23 10:24:59 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 ++ .../tajo/cli/tsql/commands/HelpCommand.java | 6 +++--- .../engine/planner/physical/BNLJoinExec.java | 3 ++- .../planner/physical/ExternalSortExec.java | 2 +- .../planner/physical/HashAggregateExec.java | 2 +- .../HashBasedColPartitionStoreExec.java | 2 +- .../planner/physical/HashFullOuterJoinExec.java | 4 ++-- .../engine/planner/physical/HashJoinExec.java | 4 ++-- .../planner/physical/HashLeftAntiJoinExec.java | 4 ++-- .../planner/physical/HashLeftOuterJoinExec.java | 4 ++-- .../planner/physical/HashLeftSemiJoinExec.java | 2 +- .../physical/HashShuffleFileWriteExec.java | 2 +- .../engine/planner/physical/HavingExec.java | 2 +- .../engine/planner/physical/MemSortExec.java | 2 +- .../physical/MergeFullOuterJoinExec.java | 3 ++- .../engine/planner/physical/MergeJoinExec.java | 3 ++- .../engine/planner/physical/NLJoinExec.java | 3 ++- .../planner/physical/NLLeftOuterJoinExec.java | 3 ++- .../physical/PartitionMergeScanExec.java | 5 ++--- .../physical/RangeShuffleFileWriteExec.java | 2 +- .../physical/RightOuterMergeJoinExec.java | 3 ++- .../engine/planner/physical/SelectionExec.java | 2 +- .../engine/planner/physical/SeqScanExec.java | 2 +- .../SortBasedColPartitionStoreExec.java | 2 +- .../engine/planner/physical/StoreTableExec.java | 2 +- .../org/apache/tajo/master/QueryInProgress.java | 10 ++++++--- .../apache/tajo/master/TajoContainerProxy.java | 10 +++++---- .../tajo/worker/TajoResourceAllocator.java | 6 +++--- .../main/java/org/apache/tajo/worker/Task.java | 22 ++++++++------------ .../apache/tajo/worker/TaskAttemptContext.java | 2 +- 30 files changed, 65 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 6ba73fe..4e14c93 100644 --- a/CHANGES +++ b/CHANGES @@ -27,6 +27,8 @@ Release 0.10.0 - unreleased IMPROVEMENT + TAJO-1309: Add missing break point in physical operator. (jinho) + TAJO-1307: HBaseStorageManager need to support for users to use hbase-site.xml file. (jaehwa) http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-client/src/main/java/org/apache/tajo/cli/tsql/commands/HelpCommand.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/tsql/commands/HelpCommand.java b/tajo-client/src/main/java/org/apache/tajo/cli/tsql/commands/HelpCommand.java index 5d41e41..ce56d12 100644 --- a/tajo-client/src/main/java/org/apache/tajo/cli/tsql/commands/HelpCommand.java +++ b/tajo-client/src/main/java/org/apache/tajo/cli/tsql/commands/HelpCommand.java @@ -18,11 +18,11 @@ package org.apache.tajo.cli.tsql.commands; -import java.io.PrintWriter; - import org.apache.tajo.cli.tsql.TajoCli; import org.apache.tajo.util.VersionInfo; +import java.io.PrintWriter; + public class HelpCommand extends TajoShellCommand { private String targetDocVersion = ""; @@ -79,7 +79,7 @@ public class HelpCommand extends TajoShellCommand { sout.println(); sout.println("Variables"); - sout.println(" \\set [[NAME] [VALUE] set session variable or list session variables"); + sout.println(" \\set [NAME] [VALUE] set session variable or list session variables"); sout.println(" \\unset NAME unset session variable"); sout.println(); sout.println(); http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java index 117b04c..14cf567 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java @@ -128,7 +128,7 @@ public class BNLJoinExec extends BinaryPhysicalExec { rightEnd = true; } - while (true) { + while (!context.isStopped()) { if (!rightIterator.hasNext()) { // if leftIterator ended if (leftIterator.hasNext()) { // if rightTupleslot remains leftTuple = leftIterator.next(); @@ -201,6 +201,7 @@ public class BNLJoinExec extends BinaryPhysicalExec { return outputTuple; } } + return null; } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java index 4e19114..c3f9d3d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java @@ -203,7 +203,7 @@ public class ExternalSortExec extends SortExec { int chunkId = 0; long runStartTime = System.currentTimeMillis(); - while ((tuple = child.next()) != null) { // partition sort start + while (!context.isStopped() && (tuple = child.next()) != null) { // partition sort start Tuple vtuple = new VTuple(tuple); inMemoryTable.add(vtuple); memoryConsumption += MemoryUtil.calculateMemorySize(vtuple); http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java index 80bba2b..0d1bf3d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java @@ -48,7 +48,7 @@ public class HashAggregateExec extends AggregationExec { private void compute() throws IOException { Tuple tuple; Tuple keyTuple; - while((tuple = child.next()) != null && !context.isStopped()) { + while(!context.isStopped() && (tuple = child.next()) != null) { keyTuple = new VTuple(groupingKeyIds.length); // build one key tuple for(int i = 0; i < groupingKeyIds.length; i++) { http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java index c28a5cd..e94bc26 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java @@ -67,7 +67,7 @@ public class HashBasedColPartitionStoreExec extends ColPartitionStoreExec { public Tuple next() throws IOException { Tuple tuple; StringBuilder sb = new StringBuilder(); - while((tuple = child.next()) != null) { + while(!context.isStopped() && (tuple = child.next()) != null) { // set subpartition directory name sb.delete(0, sb.length()); if (keyIds != null) { http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java index 28d9a3e..9cd13fb 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java @@ -144,7 +144,7 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec { Tuple rightTuple; boolean found = false; - while(!finished) { + while(!context.isStopped() && !finished) { if (shouldGetLeftTuple) { // initially, it is true. // getting new outer leftTuple = leftChild.next(); // it comes from a disk @@ -208,7 +208,7 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec { Tuple tuple; Tuple keyTuple; - while ((tuple = rightChild.next()) != null) { + while (!context.isStopped() && (tuple = rightChild.next()) != null) { keyTuple = new VTuple(joinKeyPairs.size()); for (int i = 0; i < rightKeyList.length; i++) { keyTuple.put(i, tuple.get(rightKeyList[i])); http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java index 701297f..38728b5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java @@ -111,7 +111,7 @@ public class HashJoinExec extends BinaryPhysicalExec { Tuple rightTuple; boolean found = false; - while(!finished) { + while(!context.isStopped() && !finished) { if (shouldGetLeftTuple) { // initially, it is true. // getting new outer leftTuple = leftChild.next(); // it comes from a disk @@ -156,7 +156,7 @@ public class HashJoinExec extends BinaryPhysicalExec { Tuple tuple; Tuple keyTuple; - while ((tuple = rightChild.next()) != null) { + while (!context.isStopped() && (tuple = rightChild.next()) != null) { keyTuple = new VTuple(joinKeyPairs.size()); for (int i = 0; i < rightKeyList.length; i++) { keyTuple.put(i, tuple.get(rightKeyList[i])); http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java index 236f5e3..cceed3e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java @@ -64,7 +64,7 @@ public class HashLeftAntiJoinExec extends HashJoinExec { Tuple rightTuple; boolean notFound; - while(!finished) { + while(!context.isStopped() && !finished) { // getting new outer leftTuple = leftChild.next(); // it comes from a disk @@ -89,7 +89,7 @@ public class HashLeftAntiJoinExec extends HashJoinExec { // Reach here only when a hash bucket is found. Then, it checks all tuples in the found bucket. // If it finds a matched tuple, it escapes the loop for all tuples in the hash bucket. notFound = true; - while (notFound && iterator.hasNext()) { + while (!context.isStopped() && notFound && iterator.hasNext()) { rightTuple = iterator.next(); frameTuple.set(leftTuple, rightTuple); if (joinQual.eval(inSchema, frameTuple).isTrue()) { // if the matched one is found http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java index c1b6522..233ef92 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java @@ -138,7 +138,7 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec { Tuple rightTuple; boolean found = false; - while(!finished) { + while(!context.isStopped() && !finished) { if (shouldGetLeftTuple) { // initially, it is true. // getting new outer @@ -204,7 +204,7 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec { Tuple tuple; Tuple keyTuple; - while ((tuple = rightChild.next()) != null) { + while (!context.isStopped() && (tuple = rightChild.next()) != null) { keyTuple = new VTuple(joinKeyPairs.size()); for (int i = 0; i < rightKeyList.length; i++) { keyTuple.put(i, tuple.get(rightKeyList[i])); http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java index 5196a63..37c6d0e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java @@ -70,7 +70,7 @@ public class HashLeftSemiJoinExec extends HashJoinExec { Tuple rightTuple; boolean notFound; - while(!finished) { + while(!context.isStopped() && !finished) { // getting new outer leftTuple = leftChild.next(); // it comes from a disk http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java index 3c4949f..28974f9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java @@ -103,7 +103,7 @@ public final class HashShuffleFileWriteExec extends UnaryPhysicalExec { int partId; int tupleCount = 0; long numRows = 0; - while ((tuple = child.next()) != null) { + while (!context.isStopped() && (tuple = child.next()) != null) { tupleCount++; numRows++; http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java index f9f4351..e9a7c03 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java @@ -39,7 +39,7 @@ public class HavingExec extends UnaryPhysicalExec { @Override public Tuple next() throws IOException { Tuple tuple; - while ((tuple = child.next()) != null) { + while (!context.isStopped() && (tuple = child.next()) != null) { if (qual.eval(inSchema, tuple).isTrue()) { return tuple; } http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java index 13fec7b..c77313e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java @@ -51,7 +51,7 @@ public class MemSortExec extends SortExec { if (!sorted) { Tuple tuple; - while ((tuple = child.next()) != null) { + while (!context.isStopped() && (tuple = child.next()) != null) { tupleSlots.add(new VTuple(tuple)); } http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java index cb2552b..3f2e431 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java @@ -109,7 +109,7 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec { public Tuple next() throws IOException { Tuple previous; - for (;;) { + while (!context.isStopped()) { boolean newRound = false; if((posRightTupleSlots == -1) && (posLeftTupleSlots == -1)) { newRound = true; @@ -313,6 +313,7 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec { } } // the second if end false } // for + return null; } http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java index 13104ee..63f48ac 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java @@ -102,7 +102,7 @@ public class MergeJoinExec extends BinaryPhysicalExec { public Tuple next() throws IOException { Tuple previous; - for (;;) { + while (!context.isStopped()) { if (!outerIterator.hasNext() && !innerIterator.hasNext()) { if(end){ return null; @@ -170,6 +170,7 @@ public class MergeJoinExec extends BinaryPhysicalExec { return outTuple; } } + return null; } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java index b5c6244..5e7ab98 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java @@ -67,7 +67,7 @@ public class NLJoinExec extends BinaryPhysicalExec { } public Tuple next() throws IOException { - for (;;) { + while (!context.isStopped()) { if (needNewOuter) { outerTuple = leftChild.next(); if (outerTuple == null) { @@ -94,6 +94,7 @@ public class NLJoinExec extends BinaryPhysicalExec { return outTuple; } } + return null; } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java index 8ff7570..7959d47 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java @@ -73,7 +73,7 @@ public class NLLeftOuterJoinExec extends BinaryPhysicalExec { } public Tuple next() throws IOException { - for (;;) { + while (!context.isStopped()) { if (needNextRightTuple) { leftTuple = leftChild.next(); if (leftTuple == null) { @@ -112,6 +112,7 @@ public class NLLeftOuterJoinExec extends BinaryPhysicalExec { return outTuple; } } + return null; } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java index 5297e2c..5692308 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java @@ -21,9 +21,8 @@ package org.apache.tajo.engine.planner.physical; import com.google.common.collect.Lists; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.plan.logical.ScanNode; -import org.apache.tajo.storage.StorageManager; +import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.storage.Tuple; import org.apache.tajo.worker.TaskAttemptContext; @@ -70,7 +69,7 @@ public class PartitionMergeScanExec extends PhysicalExec { @Override public Tuple next() throws IOException { Tuple tuple; - while (currentScanner != null) { + while (!context.isStopped() && currentScanner != null) { tuple = currentScanner.next(); if (tuple != null) { http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java index 119f053..8da1a03 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java @@ -96,7 +96,7 @@ public class RangeShuffleFileWriteExec extends UnaryPhysicalExec { long offset; - while((tuple = child.next()) != null) { + while(!context.isStopped() && (tuple = child.next()) != null) { offset = appender.getOffset(); appender.addTuple(tuple); keyTuple = new VTuple(keySchema.size()); http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java index a02d00b..5e80b8f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java @@ -129,7 +129,7 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec { public Tuple next() throws IOException { Tuple previous; - for (;;) { + while (!context.isStopped()) { boolean newRound = false; if((posRightTupleSlots == -1) && (posLeftTupleSlots == -1)) { newRound = true; @@ -339,6 +339,7 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec { } } // the second if end false } // for + return null; } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java index 9e84462..b9273fa 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java @@ -44,7 +44,7 @@ public class SelectionExec extends UnaryPhysicalExec { @Override public Tuple next() throws IOException { Tuple tuple; - while ((tuple = child.next()) != null) { + while (!context.isStopped() && (tuple = child.next()) != null) { if (qual.eval(inSchema, tuple).isTrue()) { return tuple; } http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java index 94cd4ed..15f17fd 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java @@ -251,7 +251,7 @@ public class SeqScanExec extends PhysicalExec { initScanner(projected); List<Tuple> broadcastTupleCacheList = new ArrayList<Tuple>(); - while (true) { + while (!context.isStopped()) { Tuple tuple = next(); if (tuple != null) { broadcastTupleCacheList.add(tuple); http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java index f7c20fc..ca90b0e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java @@ -73,7 +73,7 @@ public class SortBasedColPartitionStoreExec extends ColPartitionStoreExec { @Override public Tuple next() throws IOException { Tuple tuple; - while((tuple = child.next()) != null) { + while(!context.isStopped() && (tuple = child.next()) != null) { fillKeyTuple(tuple, currentKey); http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java index 3d3da5c..5622699 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java @@ -115,7 +115,7 @@ public class StoreTableExec extends UnaryPhysicalExec { */ @Override public Tuple next() throws IOException { - while((tuple = child.next()) != null) { + while(!context.isStopped() && (tuple = child.next()) != null) { appender.addTuple(tuple); if (maxPerFileSize > 0 && maxPerFileSize <= appender.getEstimatedOutputSize()) { http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java index 352ec46..df461c8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java @@ -80,8 +80,12 @@ public class QueryInProgress { public synchronized void kill() { getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED); - if(queryMasterRpcClient != null){ - queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get()); + if (queryMasterRpcClient != null) { + try { + queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get()); + } catch (Throwable e) { + catchException(e); + } } } @@ -165,7 +169,7 @@ public class QueryInProgress { } } - public void catchException(Exception e) { + public void catchException(Throwable e) { LOG.error(e.getMessage(), e); queryInfo.setQueryState(TajoProtos.QueryState.QUERY_FAILED); queryInfo.setLastMessage(StringUtils.stringifyException(e)); http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java index 588b7ee..42ffd87 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java @@ -31,6 +31,7 @@ import org.apache.tajo.ipc.QueryCoordinatorProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.master.container.TajoContainer; import org.apache.tajo.master.container.TajoContainerId; +import org.apache.tajo.master.event.TaskFatalErrorEvent; import org.apache.tajo.master.rm.TajoWorkerContainer; import org.apache.tajo.master.rm.TajoWorkerContainerId; import org.apache.tajo.querymaster.QueryMasterTask; @@ -82,8 +83,9 @@ public class TajoContainerProxy extends ContainerProxy { tajoWorkerRpc = RpcConnectionPool.getPool().getConnection(addr, TajoWorkerProtocol.class, true); TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub(); tajoWorkerRpcClient.killTaskAttempt(null, taskAttemptId.getProto(), NullCallback.get()); - } catch (Exception e) { - LOG.error(e.getMessage(), e); + } catch (Throwable e) { + /* Worker RPC failure */ + context.getEventHandler().handle(new TaskFatalErrorEvent(taskAttemptId, e.getMessage())); } finally { RpcConnectionPool.getPool().releaseConnection(tajoWorkerRpc); } @@ -111,7 +113,7 @@ public class TajoContainerProxy extends ContainerProxy { .build(); tajoWorkerRpcClient.startExecutionBlock(null, request, NullCallback.get()); - } catch (Exception e) { + } catch (Throwable e) { LOG.error(e.getMessage(), e); } finally { RpcConnectionPool.getPool().releaseConnection(tajoWorkerRpc); @@ -198,7 +200,7 @@ public class TajoContainerProxy extends ContainerProxy { .addAllContainerIds(containerIdProtos) .build(), NullCallback.get()); - } catch (Exception e) { + } catch (Throwable e) { LOG.error(e.getMessage(), e); } finally { connPool.releaseConnection(tmClient); http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java index 7278317..dd408c9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java @@ -128,7 +128,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator { for(ContainerProxy eachProxy: list) { try { eachProxy.stopContainer(); - } catch (Exception e) { + } catch (Throwable e) { LOG.warn(e.getMessage()); } } @@ -301,7 +301,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator { QueryCoordinatorProtocolService masterClientService = tmClient.getStub(); masterClientService.allocateWorkerResources(null, request, callBack); - } catch (Exception e) { + } catch (Throwable e) { LOG.error(e.getMessage(), e); } finally { connPool.releaseConnection(tmClient); @@ -363,7 +363,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator { containerIds.add(eachContainer.getId()); } TajoContainerProxy.releaseWorkerResource(queryTaskContext, executionBlockId, containerIds); - } catch (Exception e) { + } catch (Throwable e) { LOG.error(e.getMessage(), e); } return; http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/worker/Task.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java index 5f9c6ac..e9ad838 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java @@ -88,8 +88,6 @@ public class Task { private final Map<String, TableDesc> descs = Maps.newHashMap(); private PhysicalExec executor; private boolean interQuery; - private boolean killed = false; - private boolean aborted = false; private Path inputTableBaseDir; private long startTime; @@ -254,13 +252,11 @@ public class Task { } public void kill() { - killed = true; - context.stop(); context.setState(TaskAttemptState.TA_KILLED); + context.stop(); } public void abort() { - aborted = true; context.stop(); } @@ -299,7 +295,7 @@ public class Task { } public void updateProgress() { - if(killed || aborted){ + if(context != null && context.isStopped()){ return; } @@ -403,12 +399,12 @@ public class Task { createPlan(context, plan); this.executor.init(); - while(!killed && !aborted && executor.next() != null) { + while(!context.isStopped() && executor.next() != null) { } } catch (Throwable e) { error = e ; LOG.error(e.getMessage(), e); - aborted = true; + context.stop(); } finally { if (executor != null) { try { @@ -423,10 +419,10 @@ public class Task { executionBlockContext.completedTasksNum.incrementAndGet(); context.getHashShuffleAppenderManager().finalizeTask(taskId); QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = executionBlockContext.getQueryMasterStub(); - if (killed || aborted) { + if (context.isStopped()) { context.setExecutorProgress(0.0f); - if(killed) { - context.setState(TaskAttemptState.TA_KILLED); + + if(context.getState() == TaskAttemptState.TA_KILLED) { queryMasterStub.statusUpdate(null, getReport(), NullCallback.get()); executionBlockContext.killedTasksNum.incrementAndGet(); } else { @@ -593,7 +589,7 @@ public class Task { int retryWaitTime = 1000; //sec try { // for releasing fetch latch - while(!killed && retryNum < maxRetryNum) { + while(!context.isStopped() && retryNum < maxRetryNum) { if (retryNum > 0) { try { Thread.sleep(retryWaitTime); @@ -625,7 +621,7 @@ public class Task { if (retryNum == maxRetryNum) { LOG.error("ERROR: the maximum retry (" + retryNum + ") on the fetch exceeded (" + fetcher.getURI() + ")"); } - aborted = true; // retry task + context.stop(); // retry task ctx.getFetchLatch().countDown(); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/17c6dff4/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java index 3092c47..1f2c325 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java @@ -70,7 +70,7 @@ public class TaskAttemptContext { /** a map of shuffled file outputs */ private Map<Integer, String> shuffleFileOutputs; private File fetchIn; - private boolean stopped = false; + private volatile boolean stopped = false; private boolean interQuery = false; private Path outputPath; private DataChannel dataChannel;
