Repository: hive Updated Branches: refs/heads/llap a35f1adb7 -> cb24a7678
HIVE-10896 : LLAP: the return of the stuck DAG (Sergey Shelukhin) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cb24a767 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cb24a767 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cb24a767 Branch: refs/heads/llap Commit: cb24a76783ac3522ce415e51656208f2e24ee11b Parents: a35f1ad Author: Sergey Shelukhin <[email protected]> Authored: Wed Jun 3 11:33:13 2015 -0700 Committer: Sergey Shelukhin <[email protected]> Committed: Wed Jun 3 11:33:13 2015 -0700 ---------------------------------------------------------------------- .../hive/ql/exec/AbstractFileMergeOperator.java | 5 +- .../hive/ql/exec/AbstractMapJoinOperator.java | 6 +- .../hive/ql/exec/AppMasterEventOperator.java | 5 +- .../hadoop/hive/ql/exec/CollectOperator.java | 5 +- .../hadoop/hive/ql/exec/CommonJoinOperator.java | 5 +- .../hive/ql/exec/CommonMergeJoinOperator.java | 5 +- .../hadoop/hive/ql/exec/DemuxOperator.java | 6 +- .../hadoop/hive/ql/exec/DummyStoreOperator.java | 5 +- .../hadoop/hive/ql/exec/FileSinkOperator.java | 5 +- .../hadoop/hive/ql/exec/FilterOperator.java | 5 +- .../hadoop/hive/ql/exec/ForwardOperator.java | 4 +- .../hadoop/hive/ql/exec/GroupByOperator.java | 5 +- .../hive/ql/exec/HashTableDummyOperator.java | 5 +- .../hive/ql/exec/HashTableSinkOperator.java | 6 +- .../hadoop/hive/ql/exec/JoinOperator.java | 5 +- .../ql/exec/LateralViewForwardOperator.java | 4 +- .../hive/ql/exec/LateralViewJoinOperator.java | 6 +- .../hadoop/hive/ql/exec/LimitOperator.java | 5 +- .../hadoop/hive/ql/exec/ListSinkOperator.java | 5 +- .../hadoop/hive/ql/exec/MapJoinOperator.java | 10 +--- .../apache/hadoop/hive/ql/exec/MapOperator.java | 4 +- .../apache/hadoop/hive/ql/exec/MuxOperator.java | 5 +- .../apache/hadoop/hive/ql/exec/Operator.java | 62 +++++++++++++------- .../apache/hadoop/hive/ql/exec/PTFOperator.java | 5 +- .../hadoop/hive/ql/exec/ReduceSinkOperator.java | 5 +- .../hadoop/hive/ql/exec/SMBMapJoinOperator.java | 5 +- .../hadoop/hive/ql/exec/ScriptOperator.java | 5 +- .../hadoop/hive/ql/exec/SelectOperator.java | 7 +-- .../ql/exec/SparkHashTableSinkOperator.java | 5 +- .../hadoop/hive/ql/exec/TableScanOperator.java | 13 ++-- .../hadoop/hive/ql/exec/UDTFOperator.java | 5 +- .../hadoop/hive/ql/exec/UnionOperator.java | 5 +- .../vector/VectorAppMasterEventOperator.java | 8 +-- .../ql/exec/vector/VectorFileSinkOperator.java | 9 +-- .../ql/exec/vector/VectorFilterOperator.java | 5 +- .../ql/exec/vector/VectorGroupByOperator.java | 6 +- .../exec/vector/VectorMapJoinBaseOperator.java | 7 +-- .../ql/exec/vector/VectorMapJoinOperator.java | 6 +- .../VectorMapJoinOuterFilteredOperator.java | 6 +- .../exec/vector/VectorReduceSinkOperator.java | 7 +-- .../exec/vector/VectorSMBMapJoinOperator.java | 5 +- .../ql/exec/vector/VectorSelectOperator.java | 7 +-- .../mapjoin/VectorMapJoinCommonOperator.java | 6 +- .../vector/util/FakeCaptureOutputOperator.java | 5 +- .../util/FakeVectorDataSourceOperator.java | 4 +- 45 files changed, 135 insertions(+), 179 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java index 1e70602..f1c32b9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java @@ -65,8 +65,8 @@ public abstract class AbstractFileMergeOperator<T extends FileMergeDesc> protected transient DynamicPartitionCtx dpCtx; @Override - public Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { - Collection<Future<?>> result = super.initializeOp(hconf); + public void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); this.jc = new JobConf(hconf); incompatFileSet = new HashSet<Path>(); autoDelete = false; @@ -94,7 +94,6 @@ public abstract class AbstractFileMergeOperator<T extends FileMergeDesc> throw new HiveException("Failed to initialize AbstractFileMergeOperator", e); } - return result; } // sets up temp and task temp path http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java index f948861..7302688 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java @@ -59,7 +59,7 @@ public abstract class AbstractMapJoinOperator <T extends MapJoinDesc> extends Co @Override @SuppressWarnings("unchecked") - protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { + protected void initializeOp(Configuration hconf) throws HiveException { if (conf.getGenJoinKeys()) { int tagLen = conf.getTagLength(); joinKeys = new List[tagLen]; @@ -68,7 +68,7 @@ public abstract class AbstractMapJoinOperator <T extends MapJoinDesc> extends Co inputObjInspectors,NOTSKIPBIGTABLE, tagLen); } - Collection<Future<?>> result = super.initializeOp(hconf); + super.initializeOp(hconf); numMapRowsRead = 0; @@ -82,8 +82,6 @@ public abstract class AbstractMapJoinOperator <T extends MapJoinDesc> extends Co posBigTable, joinCacheSize,spillTableDesc, conf, !hasFilter(posBigTable), reporter); storage[posBigTable] = bigPosRC; - - return result; } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java index dd2be03..7114177 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java @@ -52,14 +52,13 @@ public class AppMasterEventOperator extends Operator<AppMasterEventDesc> { protected transient long MAX_SIZE; @Override - public Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { - Collection<Future<?>> result = super.initializeOp(hconf); + public void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); MAX_SIZE = HiveConf.getLongVar(hconf, ConfVars.TEZ_DYNAMIC_PARTITION_PRUNING_MAX_EVENT_SIZE); serializer = (Serializer) ReflectionUtils.newInstance(conf.getTable().getDeserializerClass(), null); initDataBuffer(false); - return result; } protected void initDataBuffer(boolean skipPruning) throws HiveException { http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java index a7fbfe7..e2f4f58 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java @@ -43,11 +43,10 @@ public class CollectOperator extends Operator<CollectDesc> implements transient int maxSize; @Override - protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { - Collection<Future<?>> result = super.initializeOp(hconf); + protected void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); rowList = new ArrayList<Object>(); maxSize = conf.getBufferSize().intValue(); - return result; } boolean firstRow = true; http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java index 55c5079..bcb9fce 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java @@ -187,8 +187,8 @@ public abstract class CommonJoinOperator<T extends JoinDesc> extends @Override @SuppressWarnings("unchecked") - protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { - Collection<Future<?>> result = super.initializeOp(hconf); + protected void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); this.handleSkewJoin = conf.getHandleSkewJoin(); this.hconf = hconf; @@ -322,7 +322,6 @@ public abstract class CommonJoinOperator<T extends JoinDesc> extends if (isLogInfoEnabled) { LOG.info("JOIN " + outputObjInspector.getTypeName() + " totalsz = " + totalSz); } - return result; } transient boolean newGroupStarted = false; http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java index d1d5e2b..6c57db3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java @@ -95,8 +95,8 @@ public class CommonMergeJoinOperator extends AbstractMapJoinOperator<CommonMerge @SuppressWarnings("unchecked") @Override - public Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { - Collection<Future<?>> result = super.initializeOp(hconf); + public void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); firstFetchHappened = false; fetchInputAtClose = getFetchInputAtCloseList(); @@ -148,7 +148,6 @@ public class CommonMergeJoinOperator extends AbstractMapJoinOperator<CommonMerge } sources = ((TezContext) MapredContext.get()).getRecordSources(); - return result; } /* http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/DemuxOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DemuxOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DemuxOperator.java index b7b054a..41389bd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DemuxOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DemuxOperator.java @@ -110,8 +110,8 @@ public class DemuxOperator extends Operator<DemuxDesc> private int[][] newChildOperatorsTag; @Override - protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { - Collection<Future<?>> result = super.initializeOp(hconf); + protected void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); // A DemuxOperator should have at least one child if (childOperatorsArray.length == 0) { throw new HiveException( @@ -183,7 +183,7 @@ public class DemuxOperator extends Operator<DemuxDesc> if (isLogInfoEnabled) { LOG.info("newChildOperatorsTag " + Arrays.toString(newChildOperatorsTag)); } - return result; + } private int[] toArray(List<Integer> list) { http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java index 6a68059..0c12570 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java @@ -74,8 +74,8 @@ public class DummyStoreOperator extends Operator<DummyStoreDesc> implements Seri } @Override - protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { - Collection<Future<?>> ret = super.initializeOp(hconf); + protected void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); /* * The conversion to standard object inspector was necessitated by HIVE-5973. The issue * happens when a select operator preceeds this operator as in the case of a subquery. The @@ -90,7 +90,6 @@ public class DummyStoreOperator extends Operator<DummyStoreDesc> implements Seri */ outputObjInspector = ObjectInspectorUtils.getStandardObjectInspector(inputObjInspectors[0]); result = new InspectableObject(null, outputObjInspector); - return ret; } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 553113e..0499b70 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -325,8 +325,8 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements } @Override - protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { - Collection<Future<?>> result = super.initializeOp(hconf); + protected void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); try { this.hconf = hconf; filesCreated = false; @@ -445,7 +445,6 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements e.printStackTrace(); throw new HiveException(e); } - return result; } private void logOutputFormatError(Configuration hconf, HiveException ex) { http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java index 65301c0..ed78593 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java @@ -50,8 +50,8 @@ public class FilterOperator extends Operator<FilterDesc> implements } @Override - protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { - Collection<Future<?>> result = super.initializeOp(hconf); + protected void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); try { heartbeatInterval = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVESENDHEARTBEAT); @@ -65,7 +65,6 @@ public class FilterOperator extends Operator<FilterDesc> implements } catch (Throwable e) { throw new HiveException(e); } - return result; } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java index 6cd8c80..7a4c58a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java @@ -62,7 +62,7 @@ public class ForwardOperator extends Operator<ForwardDesc> implements } @Override - protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { - return super.initializeOp(hconf); + protected void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); } } http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java index 9867739..1693ec3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java @@ -180,8 +180,8 @@ public class GroupByOperator extends Operator<GroupByDesc> { } @Override - protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { - Collection<Future<?>> result = super.initializeOp(hconf); + protected void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); numRowsInput = 0; numRowsHashTbl = 0; @@ -393,7 +393,6 @@ public class GroupByOperator extends Operator<GroupByDesc> { memoryMXBean = ManagementFactory.getMemoryMXBean(); maxMemory = memoryMXBean.getHeapMemoryUsage().getMax(); memoryThreshold = this.getConf().getMemoryThreshold(); - return result; } /** http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java index 2829a9d..1de8c76 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java @@ -33,8 +33,8 @@ public class HashTableDummyOperator extends Operator<HashTableDummyDesc> impleme private static final long serialVersionUID = 1L; @Override - protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { - Collection<Future<?>> result = super.initializeOp(hconf); + protected void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); TableDesc tbl = this.getConf().getTbl(); try { Deserializer serde = tbl.getDeserializerClass().newInstance(); @@ -44,7 +44,6 @@ public class HashTableDummyOperator extends Operator<HashTableDummyDesc> impleme LOG.error("Generating output obj inspector from dummy object error", e); e.printStackTrace(); } - return result; } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java index 96283cd..ad93623 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java @@ -116,8 +116,8 @@ public class HashTableSinkOperator extends TerminalOperator<HashTableSinkDesc> i @Override @SuppressWarnings("unchecked") - protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { - Collection<Future<?>> result = super.initializeOp(hconf); + protected void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); boolean isSilent = HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVESESSIONSILENT); console = new LogHelper(LOG, isSilent); memoryExhaustionHandler = new MapJoinMemoryExhaustionHandler(console, conf.getHashtableMemoryUsage()); @@ -192,7 +192,7 @@ public class HashTableSinkOperator extends TerminalOperator<HashTableSinkDesc> i } catch (SerDeException e) { throw new HiveException(e); } - return result; + } public MapJoinTableContainer[] getMapJoinTables() { http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java index 069d91a..3b92ab6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java @@ -56,15 +56,14 @@ public class JoinOperator extends CommonJoinOperator<JoinDesc> implements Serial private final transient LongWritable skewjoin_followup_jobs = new LongWritable(0); @Override - protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { - Collection<Future<?>> result = super.initializeOp(hconf); + protected void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); if (handleSkewJoin) { skewJoinKeyContext = new SkewJoinHandler(this); skewJoinKeyContext.initiliaze(hconf); skewJoinKeyContext.setSkewJoinJobCounter(skewjoin_followup_jobs); } statsMap.put(SkewkeyTableCounter.SKEWJOINFOLLOWUPJOBS.toString(), skewjoin_followup_jobs); - return result; } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewForwardOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewForwardOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewForwardOperator.java index e1479c0..e866eed 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewForwardOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewForwardOperator.java @@ -56,7 +56,7 @@ public class LateralViewForwardOperator extends Operator<LateralViewForwardDesc> } @Override - protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { - return super.initializeOp(hconf); + protected void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); } } http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java index 15b8387..55bb08f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java @@ -84,8 +84,8 @@ public class LateralViewJoinOperator extends Operator<LateralViewJoinDesc> { public static final byte UDTF_TAG = 1; @Override - protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { - Collection<Future<?>> result = super.initializeOp(hconf); + protected void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>(); ArrayList<String> fieldNames = conf.getOutputInternalColNames(); @@ -107,8 +107,6 @@ public class LateralViewJoinOperator extends Operator<LateralViewJoinDesc> { outputObjInspector = ObjectInspectorFactory .getStandardStructObjectInspector(fieldNames, ois); - return result; - } // acc is short for accumulator. It's used to build the row before forwarding http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java index 86519a6..8fe96be 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java @@ -39,13 +39,12 @@ public class LimitOperator extends Operator<LimitDesc> implements Serializable { protected transient boolean isMap; @Override - protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { - Collection<Future<?>> result = super.initializeOp(hconf); + protected void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); limit = conf.getLimit(); leastRow = conf.getLeastRows(); currCount = 0; isMap = hconf.getBoolean("mapred.task.is.map", true); - return result; } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java index 87917dc..919e72f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java @@ -44,14 +44,13 @@ public class ListSinkOperator extends Operator<ListSinkDesc> { private transient int numRows; @Override - protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { - Collection<Future<?>> result = super.initializeOp(hconf); + protected void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); try { fetcher = initializeFetcher(hconf); } catch (Exception e) { throw new HiveException(e); } - return result; } private FetchFormatter initializeFetcher(Configuration conf) throws Exception { http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index cb1f44d..2405d41 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -122,14 +122,11 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem } @Override - protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { + protected void initializeOp(Configuration hconf) throws HiveException { this.hconf = hconf; unwrapContainer = new UnwrapRowContainer[conf.getTagLength()]; - Collection<Future<?>> result = super.initializeOp(hconf); - if (result == null) { - result = new HashSet<Future<?>>(); - } + super.initializeOp(hconf); int tagLen = conf.getTagLength(); @@ -175,13 +172,12 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem return loadHashTable(mapContext, mrContext); } }); - result.add(future); + asyncInitOperations.add(future); } else if (mapContext == null || mapContext.getLocalWork() == null || mapContext.getLocalWork().getInputFileChangeSensitive() == false) { loadHashTable(mapContext, mrContext); hashTblInitedOnce = true; } - return result; } @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java index 014d2a1..00992d1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java @@ -420,8 +420,8 @@ public class MapOperator extends Operator<MapWork> implements Serializable, Clon } @Override - public Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { - return super.initializeOp(hconf); + public void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); } public void initializeMapOperator(Configuration hconf) throws HiveException { http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/MuxOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MuxOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MuxOperator.java index eb4dff3..2760a8d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MuxOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MuxOperator.java @@ -171,8 +171,8 @@ public class MuxOperator extends Operator<MuxDesc> implements Serializable{ private transient long[] nextCntrs; @Override - protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { - Collection<Future<?>> result = super.initializeOp(hconf); + protected void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); // A MuxOperator should only have a single child if (childOperatorsArray.length != 1) { @@ -208,7 +208,6 @@ public class MuxOperator extends Operator<MuxDesc> implements Serializable{ cntrs[i] = 0; nextCntrs[i] = 1; } - return result; } /** http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index 3a6c5f1..63eed9d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -72,6 +72,7 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C protected AtomicBoolean abortOp; private transient ExecMapperContext execContext; private transient boolean rootInitializeCalled = false; + protected final transient Collection<Future<?>> asyncInitOperations = new HashSet<>(); private static AtomicInteger seqId; @@ -362,39 +363,58 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C // derived classes can set this to different object if needed outputObjInspector = inputObjInspectors[0]; - Collection<Future<?>> asyncInitOperations = initializeOp(hconf); - - // sanity checks - if (!rootInitializeCalled - || asyncInitOperations == null - || childOperatorsArray.length != childOperators.size()) { - throw new AssertionError("Internal error during operator initialization"); - } + boolean isInitOk = false; + try { + initializeOp(hconf); + // sanity checks + if (!rootInitializeCalled + || childOperatorsArray.length != childOperators.size()) { + throw new AssertionError("Internal error during operator initialization"); + } + if (isLogInfoEnabled) { + LOG.info("Initialization Done " + id + " " + getName()); + } - if (isLogInfoEnabled) { - LOG.info("Initialization Done " + id + " " + getName()); + initializeChildren(hconf); + isInitOk = true; + } finally { + // TODO: ugly hack because Java doesn't have dtors and Tez input hangs on shutdown. + if (!isInitOk) { + cancelAsyncInitOps(); + } } - initializeChildren(hconf); - // let's wait on the async ops before continuing completeInitialization(asyncInitOperations); } + private void cancelAsyncInitOps() { + for (Future<?> f : asyncInitOperations) { + f.cancel(true); + } + asyncInitOperations.clear(); + } + private void completeInitialization(Collection<Future<?>> fs) throws HiveException { Object[] os = new Object[fs.size()]; int i = 0; + Throwable asyncEx = null; for (Future<?> f : fs) { - try { - if (abortOp.get()) { - f.cancel(true); - } else { + if (abortOp.get() || asyncEx != null) { + // We were aborted, interrupted or one of the operations failed; terminate all. + f.cancel(true); + } else { + try { os[i++] = f.get(); + } catch (Throwable t) { + f.cancel(true); + asyncEx = t; } - } catch (Exception e) { - throw new HiveException(e); } } + if (asyncEx != null) { + throw new HiveException("Async initialization failed", asyncEx); + } completeInitializationOp(os); } @@ -421,9 +441,8 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C /** * Operator specific initialization. */ - protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { + protected void initializeOp(Configuration hconf) throws HiveException { rootInitializeCalled = true; - return new ArrayList<Future<?>>(); } /** @@ -1363,8 +1382,7 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C } @Override - protected Collection<Future<?>> initializeOp(Configuration conf) { - return childOperators; + protected void initializeOp(Configuration conf) { } } } http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java index 7d465d2..113ac21 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java @@ -67,8 +67,8 @@ public class PTFOperator extends Operator<PTFDesc> implements Serializable { * 4. Create input partition to store rows coming from previous operator */ @Override - protected Collection<Future<?>> initializeOp(Configuration jobConf) throws HiveException { - Collection<Future<?>> result = super.initializeOp(jobConf); + protected void initializeOp(Configuration jobConf) throws HiveException { + super.initializeOp(jobConf); hiveConf = jobConf; isMapOperator = conf.isMapSide(); @@ -86,7 +86,6 @@ public class PTFOperator extends Operator<PTFDesc> implements Serializable { ptfInvocation = setupChain(); ptfInvocation.initializeStreaming(jobConf, isMapOperator); firstMapRow = true; - return result; } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java index 859a28f..81650ce 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java @@ -156,8 +156,8 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc> private final transient LongWritable recordCounter = new LongWritable(); @Override - protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { - Collection<Future<?>> result = super.initializeOp(hconf); + protected void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); try { numRows = 0; @@ -248,7 +248,6 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc> LOG.error(msg, e); throw new RuntimeException(e); } - return result; } http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java index 390a12e..b094fd9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java @@ -93,7 +93,7 @@ public class SMBMapJoinOperator extends AbstractMapJoinOperator<SMBJoinDesc> imp } @Override - protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { + protected void initializeOp(Configuration hconf) throws HiveException { // If there is a sort-merge join followed by a regular join, the SMBJoinOperator may not // get initialized at all. Consider the following query: @@ -101,7 +101,7 @@ public class SMBMapJoinOperator extends AbstractMapJoinOperator<SMBJoinDesc> imp // For the mapper processing C, The SMJ is not initialized, no need to close it either. initDone = true; - Collection<Future<?>> result = super.initializeOp(hconf); + super.initializeOp(hconf); closeCalled = false; @@ -156,7 +156,6 @@ public class SMBMapJoinOperator extends AbstractMapJoinOperator<SMBJoinDesc> imp } foundNextKeyGroup[pos] = false; } - return result; } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java index f2eed44..35ed67e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java @@ -261,8 +261,8 @@ public class ScriptOperator extends Operator<ScriptDesc> implements } @Override - protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { - Collection<Future<?>> result = super.initializeOp(hconf); + protected void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); firstRow = true; statsMap.put(Counter.DESERIALIZE_ERRORS.toString(), deserialize_error_count); @@ -286,7 +286,6 @@ public class ScriptOperator extends Operator<ScriptDesc> implements } catch (Exception e) { throw new HiveException(ErrorMsg.SCRIPT_INIT_ERROR.getErrorCodedMsg(), e); } - return result; } boolean isBrokenPipeException(IOException e) { http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java index cd7fb92..b1b8459 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java @@ -44,12 +44,12 @@ public class SelectOperator extends Operator<SelectDesc> implements Serializable private transient boolean isSelectStarNoCompute = false; @Override - protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { - Collection<Future<?>> result = super.initializeOp(hconf); + protected void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); // Just forward the row as is if (conf.isSelStarNoCompute()) { isSelectStarNoCompute = true; - return result; + return; } List<ExprNodeDesc> colList = conf.getColList(); eval = new ExprNodeEvaluator[colList.size()]; @@ -66,7 +66,6 @@ public class SelectOperator extends Operator<SelectDesc> implements Serializable } outputObjInspector = initEvaluatorsAndReturnStruct(eval, conf.getOutputColumnNames(), inputObjInspectors[0]); - return result; } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java index 94144a2..4a5f0b9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java @@ -61,14 +61,13 @@ public class SparkHashTableSinkOperator } @Override - protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { - Collection<Future<?>> result = super.initializeOp(hconf); + protected void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); ObjectInspector[] inputOIs = new ObjectInspector[conf.getTagLength()]; inputOIs[tag] = inputObjInspectors[0]; conf.setTagOrder(new Byte[]{ tag }); htsOperator.setConf(conf); htsOperator.initialize(hconf, inputOIs); - return result; } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java index cbf02e9..a586bc8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java @@ -193,17 +193,17 @@ public class TableScanOperator extends Operator<TableScanDesc> implements } @Override - protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { - Collection<Future<?>> result = super.initializeOp(hconf); + protected void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); inputFileChanged = false; if (conf == null) { - return result; + return; } rowLimit = conf.getRowLimit(); if (!conf.isGatherStats()) { - return result; + return; } this.hconf = hconf; @@ -217,11 +217,6 @@ public class TableScanOperator extends Operator<TableScanDesc> implements defaultPartitionName = HiveConf.getVar(hconf, HiveConf.ConfVars.DEFAULTPARTITIONNAME); currentStat = null; stats = new HashMap<String, Stat>(); - if (conf.getPartColumns() == null || conf.getPartColumns().size() == 0) { - // NON PARTITIONED table - return result; - } - return result; } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java index 94cecd0..e64fa7b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java @@ -59,8 +59,8 @@ public class UDTFOperator extends Operator<UDTFDesc> implements Serializable { transient AutoProgressor autoProgressor; @Override - protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { - Collection<Future<?>> result = super.initializeOp(hconf); + protected void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); genericUDTF = conf.getGenericUDTF(); collector = new UDTFCollector(this); @@ -93,7 +93,6 @@ public class UDTFOperator extends Operator<UDTFDesc> implements Serializable { hconf, HiveConf.ConfVars.HIVES_AUTO_PROGRESS_TIMEOUT, TimeUnit.MILLISECONDS)); autoProgressor.go(); } - return result; } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java index 9bbaadd..a49097c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java @@ -56,8 +56,8 @@ public class UnionOperator extends Operator<UnionDesc> implements Serializable { * needsTransform[]. */ @Override - protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { - Collection<Future<?>> result = super.initializeOp(hconf); + protected void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); int parents = parentOperators.size(); parentObjInspectors = new StructObjectInspector[parents]; @@ -119,7 +119,6 @@ public class UnionOperator extends Operator<UnionDesc> implements Serializable { + "] from " + inputObjInspectors[p] + " to " + outputObjInspector); } } - return result; } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAppMasterEventOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAppMasterEventOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAppMasterEventOperator.java index df82d21..e4ca2cd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAppMasterEventOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAppMasterEventOperator.java @@ -63,7 +63,7 @@ public class VectorAppMasterEventOperator extends AppMasterEventOperator { } @Override - public Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { + public void initializeOp(Configuration hconf) throws HiveException { // We need a input object inspector that is for the row we will extract out of the // vectorized row batch, not for example, an original inspector for an ORC table, etc. @@ -71,12 +71,8 @@ public class VectorAppMasterEventOperator extends AppMasterEventOperator { VectorizedBatchUtil.convertToStandardStructObjectInspector((StructObjectInspector) inputObjInspectors[0]); // Call AppMasterEventOperator with new input inspector. - Collection<Future<?>> result = super.initializeOp(hconf); - assert result.isEmpty(); - + super.initializeOp(hconf); firstBatch = true; - - return result; } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java index 2ccc9a5..09d4a8e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java @@ -59,20 +59,15 @@ public class VectorFileSinkOperator extends FileSinkOperator { } @Override - protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { + protected void initializeOp(Configuration hconf) throws HiveException { // We need a input object inspector that is for the row we will extract out of the // vectorized row batch, not for example, an original inspector for an ORC table, etc. inputObjInspectors[0] = VectorizedBatchUtil.convertToStandardStructObjectInspector((StructObjectInspector) inputObjInspectors[0]); - - // Call FileSinkOperator with new input inspector. - Collection<Future<?>> result = super.initializeOp(hconf); - assert result.isEmpty(); + super.initializeOp(hconf); firstBatch = true; - - return result; } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java index d1b8939..ad65914 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java @@ -61,8 +61,8 @@ public class VectorFilterOperator extends FilterOperator { @Override - protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { - Collection<Future<?>> result = super.initializeOp(hconf); + protected void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); try { heartbeatInterval = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVESENDHEARTBEAT); @@ -77,7 +77,6 @@ public class VectorFilterOperator extends FilterOperator { filterMode = -1; } } - return result; } public void setFilterCondition(VectorExpression expr) { http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java index 39a83e3..a81d107 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java @@ -774,9 +774,8 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements } @Override - protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { - Collection<Future<?>> result = super.initializeOp(hconf); - assert result.isEmpty(); + protected void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); List<ObjectInspector> objectInspectors = new ArrayList<ObjectInspector>(); @@ -839,7 +838,6 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements processingMode = this.new ProcessingModeHashAggregate(); } processingMode.initialize(hconf); - return result; } /** http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java index 0baec2c..b9f42dd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java @@ -86,9 +86,8 @@ public class VectorMapJoinBaseOperator extends MapJoinOperator implements Vector } @Override - public Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { - - Collection<Future<?>> result = super.initializeOp(hconf); + public void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); vrbCtx = new VectorizedRowBatchCtx(); vrbCtx.init(vOutContext.getScratchColumnTypeMap(), (StructObjectInspector) this.outputObjInspector); @@ -96,8 +95,6 @@ public class VectorMapJoinBaseOperator extends MapJoinOperator implements Vector outputBatch = vrbCtx.createVectorizedRowBatch(); outputVectorAssignRowMap = new HashMap<ObjectInspector, VectorAssignRowSameBatch>(); - - return result; } /** http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java index e9bd44a..007782d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java @@ -99,7 +99,7 @@ public class VectorMapJoinOperator extends VectorMapJoinBaseOperator { } @Override - public Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { + public void initializeOp(Configuration hconf) throws HiveException { // Use a final variable to properly parameterize the processVectorInspector closure. // Using a member variable in the closure will not do the right thing... @@ -118,7 +118,7 @@ public class VectorMapJoinOperator extends VectorMapJoinBaseOperator { }); singleRow = new Object[rowWriters.length]; - Collection<Future<?>> result = super.initializeOp(hconf); + super.initializeOp(hconf); List<ExprNodeDesc> keyDesc = conf.getKeys().get(posBigTable); keyOutputWriters = VectorExpressionWriterFactory.getExpressionWriters(keyDesc); @@ -176,8 +176,6 @@ public class VectorMapJoinOperator extends VectorMapJoinBaseOperator { // Filtering is handled in the input batch processing filterMaps[posBigTable] = null; - - return result; } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOuterFilteredOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOuterFilteredOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOuterFilteredOperator.java index a96816f..b8b1f88 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOuterFilteredOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOuterFilteredOperator.java @@ -60,7 +60,7 @@ public class VectorMapJoinOuterFilteredOperator extends VectorMapJoinBaseOperato } @Override - public Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { + public void initializeOp(Configuration hconf) throws HiveException { final int posBigTable = conf.getPosBigTable(); @@ -71,11 +71,9 @@ public class VectorMapJoinOuterFilteredOperator extends VectorMapJoinBaseOperato // Call super VectorMapJoinOuterFilteredOperator, which calls super MapJoinOperator with // new input inspector. - Collection<Future<?>> result = super.initializeOp(hconf); + super.initializeOp(hconf); firstBatch = true; - - return result; } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java index 2c42d1f..41b2874 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java @@ -57,7 +57,7 @@ public class VectorReduceSinkOperator extends ReduceSinkOperator { } @Override - protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { + protected void initializeOp(Configuration hconf) throws HiveException { // We need a input object inspector that is for the row we will extract out of the // vectorized row batch, not for example, an original inspector for an ORC table, etc. @@ -65,12 +65,9 @@ public class VectorReduceSinkOperator extends ReduceSinkOperator { VectorizedBatchUtil.convertToStandardStructObjectInspector((StructObjectInspector) inputObjInspectors[0]); // Call ReduceSinkOperator with new input inspector. - Collection<Future<?>> result = super.initializeOp(hconf); - assert result.isEmpty(); + super.initializeOp(hconf); firstBatch = true; - - return result; } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java index a2f8091..323007e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java @@ -131,8 +131,8 @@ public class VectorSMBMapJoinOperator extends SMBMapJoinOperator implements Vect } @Override - protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { - Collection<Future<?>> result = super.initializeOp(hconf); + protected void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); vrbCtx = new VectorizedRowBatchCtx(); vrbCtx.init(vOutContext.getScratchColumnTypeMap(), (StructObjectInspector) this.outputObjInspector); @@ -214,7 +214,6 @@ public class VectorSMBMapJoinOperator extends SMBMapJoinOperator implements Vect } // Now replace the old evaluators with our own joinValues[posBigTable] = vectorNodeEvaluators; - return result; } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java index 212aa99..7e95478 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java @@ -83,11 +83,11 @@ public class VectorSelectOperator extends Operator<SelectDesc> implements } @Override - protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { - Collection<Future<?>> result = super.initializeOp(hconf); + protected void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); // Just forward the row as is if (conf.isSelStarNoCompute()) { - return null; + return; } List<ObjectInspector> objectInspectors = new ArrayList<ObjectInspector>(); @@ -106,7 +106,6 @@ public class VectorSelectOperator extends Operator<SelectDesc> implements for (int i = 0; i < projectedColumns.length; i++) { projectedColumns[i] = vExpressions[i].getOutputColumn(); } - return result; } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java index af78776..6683e75 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java @@ -545,8 +545,8 @@ public abstract class VectorMapJoinCommonOperator extends MapJoinOperator implem } @Override - protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { - Collection<Future<?>> result = super.initializeOp(hconf); + protected void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); if (LOG.isDebugEnabled()) { // Determine the name of our map or reduce task for debug tracing. @@ -606,8 +606,6 @@ public abstract class VectorMapJoinCommonOperator extends MapJoinOperator implem i++; } } - - return result; } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/util/FakeCaptureOutputOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/util/FakeCaptureOutputOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/util/FakeCaptureOutputOperator.java index 93a6aed..7454b01 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/util/FakeCaptureOutputOperator.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/util/FakeCaptureOutputOperator.java @@ -74,10 +74,9 @@ public class FakeCaptureOutputOperator extends Operator<FakeCaptureOutputDesc> } @Override - public Collection<Future<?>> initializeOp(Configuration conf) throws HiveException { - Collection<Future<?>> result = super.initializeOp(conf); + public void initializeOp(Configuration conf) throws HiveException { + super.initializeOp(conf); rows = new ArrayList<Object>(); - return result; } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/cb24a767/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/util/FakeVectorDataSourceOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/util/FakeVectorDataSourceOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/util/FakeVectorDataSourceOperator.java index fe990f8..3bea307 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/util/FakeVectorDataSourceOperator.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/util/FakeVectorDataSourceOperator.java @@ -62,8 +62,8 @@ public class FakeVectorDataSourceOperator extends Operator<FakeVectorDataSourceO } @Override - public Collection<Future<?>> initializeOp(Configuration conf) throws HiveException { - return super.initializeOp(conf); + public void initializeOp(Configuration conf) throws HiveException { + super.initializeOp(conf); } @Override
